SESSIONS = new ConcurrentHashMap<>();
+
+ private final String id = UUID.randomUUID().toString();
+
+ private MQTTProtocolHandler protocolHandler;
+
+ private MQTTSubscriptionManager subscriptionManager;
+
+ private MQTTSessionCallback sessionCallback;
+
+ private ServerSessionImpl serverSession;
+
+ private MQTTPublishManager mqttPublishManager;
+
+ private MQTTConnectionManager mqttConnectionManager;
+
+ private MQTTRetainMessageManager retainMessageManager;
+
+ private MQTTConnection connection;
+
+ protected MQTTSessionState state;
+
+ private boolean stopped = false;
+
+ private MQTTLogger log = MQTTLogger.LOGGER;
+
+ public MQTTSession( MQTTProtocolHandler protocolHandler, MQTTConnection connection) throws Exception
+ {
+ this.protocolHandler = protocolHandler;
+ this.connection = connection;
+
+ mqttConnectionManager = new MQTTConnectionManager(this);
+ mqttPublishManager = new MQTTPublishManager(this);
+ sessionCallback = new MQTTSessionCallback(this);
+ subscriptionManager = new MQTTSubscriptionManager(this);
+ retainMessageManager = new MQTTRetainMessageManager(this);
+
+ log.debug("SESSION CREATED: " + id);
+ }
+
+ // Called after the client has Connected.
+ synchronized void start() throws Exception
+ {
+ mqttPublishManager.start();
+ subscriptionManager.start();
+ stopped = false;
+ }
+
+ // TODO ensure resources are cleaned up for GC.
+ synchronized void stop() throws Exception
+ {
+ if (!stopped)
+ {
+ protocolHandler.stop(false);
+ // TODO this should pass in clean session.
+ subscriptionManager.stop(false);
+ mqttPublishManager.stop(false);
+
+ if (serverSession != null)
+ {
+ serverSession.stop();
+ serverSession.close(false);
+ }
+
+ if (state != null)
+ {
+ state.setAttached(false);
+ }
+ }
+ stopped = true;
+ }
+
+ boolean getStopped()
+ {
+ return stopped;
+ }
+
+ MQTTPublishManager getMqttPublishManager()
+ {
+ return mqttPublishManager;
+ }
+
+ MQTTSessionState getState()
+ {
+ return state;
+ }
+
+ MQTTConnectionManager getConnectionManager()
+ {
+ return mqttConnectionManager;
+ }
+
+ MQTTSessionState getSessionState()
+ {
+ return state;
+ }
+
+ ServerSessionImpl getServerSession()
+ {
+ return serverSession;
+ }
+
+ ActiveMQServer getServer()
+ {
+ return protocolHandler.getServer();
+ }
+
+ MQTTSubscriptionManager getSubscriptionManager()
+ {
+ return subscriptionManager;
+ }
+
+ MQTTProtocolHandler getProtocolHandler()
+ {
+ return protocolHandler;
+ }
+
+ SessionCallback getSessionCallback()
+ {
+ return sessionCallback;
+ }
+
+ void setServerSession(ServerSessionImpl serverSession)
+ {
+ this.serverSession = serverSession;
+ }
+
+ void setSessionState(MQTTSessionState state)
+ {
+ this.state = state;
+ state.setAttached(true);
+ }
+
+ MQTTRetainMessageManager getRetainMessageManager()
+ {
+ return retainMessageManager;
+ }
+
+ MQTTConnection getConnection()
+ {
+ return connection;
+ }
+}
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
new file mode 100644
index 0000000000..63e19a5005
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+
+public class MQTTSessionCallback implements SessionCallback
+{
+ private MQTTSession session;
+
+ private MQTTLogger log = MQTTLogger.LOGGER;
+
+ public MQTTSessionCallback(MQTTSession session) throws Exception
+ {
+ this.session = session;
+ }
+
+ @Override
+ public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount)
+ {
+ try
+ {
+ session.getMqttPublishManager().sendMessage(message, consumer, deliveryCount);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage());
+ }
+ return 1;
+ }
+
+ @Override
+ public int sendLargeMessageContinuation(ServerConsumer consumerID, byte[] body, boolean continues, boolean requiresResponse)
+ {
+ log.warn("Sending LARGE MESSAGE");
+ return 1;
+ }
+
+ @Override
+ public void addReadyListener(ReadyListener listener)
+ {
+ session.getConnection().getTransportConnection().addReadyListener(listener);
+ }
+
+ @Override
+ public void removeReadyListener(ReadyListener listener)
+ {
+ session.getConnection().getTransportConnection().removeReadyListener(listener);
+ }
+
+ @Override
+ public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount)
+ {
+ return sendMessage(message, consumer, deliveryCount);
+ }
+
+ @Override
+ public void disconnect(ServerConsumer consumer, String queueName)
+ {
+ try
+ {
+ consumer.removeItself();
+ }
+ catch (Exception e)
+ {
+ log.error(e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean hasCredits(ServerConsumer consumerID)
+ {
+ return true;
+ }
+
+ @Override
+ public void sendProducerCreditsMessage(int credits, SimpleString address)
+ {
+ }
+
+ @Override
+ public void sendProducerCreditsFailMessage(int credits, SimpleString address)
+ {
+ }
+
+ @Override
+ public void closed()
+ {
+ }
+
+}
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
new file mode 100644
index 0000000000..b7fa43624f
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MQTTSessionState
+{
+ private String clientId;
+
+ private ServerMessage willMessage;
+
+ private final ConcurrentHashMap subscriptions = new ConcurrentHashMap<>();
+
+ // Used to store Packet ID of Publish QoS1 and QoS2 message. See spec: 4.3.3 QoS 2: Exactly once delivery. Method B.
+ private Map messageRefStore;
+
+ private Map> addressMessageMap;
+
+ private Set pubRec;
+
+ private Set pub;
+
+ private boolean attached = false;
+
+ private MQTTLogger log = MQTTLogger.LOGGER;
+
+ // Objects track the Outbound message references
+ private Map> outboundMessageReferenceStore;
+
+ private ConcurrentHashMap> reverseOutboundReferenceStore;
+
+ private final Object outboundLock = new Object();
+
+ // FIXME We should use a better mechanism for creating packet IDs.
+ private AtomicInteger lastId = new AtomicInteger(0);
+
+ public MQTTSessionState(String clientId)
+ {
+ this.clientId = clientId;
+
+ pubRec = new HashSet<>();
+ pub = new HashSet<>();
+
+ outboundMessageReferenceStore = new ConcurrentHashMap<>();
+ reverseOutboundReferenceStore = new ConcurrentHashMap<>();
+
+ messageRefStore = new ConcurrentHashMap<>();
+ addressMessageMap = new ConcurrentHashMap<>();
+ }
+
+ int generateId()
+ {
+ lastId.compareAndSet(Short.MAX_VALUE, 1);
+ return lastId.addAndGet(1);
+ }
+
+ void addOutbandMessageRef(int mqttId, String address, long serverMessageId, int qos)
+ {
+ synchronized (outboundLock)
+ {
+ outboundMessageReferenceStore.put(mqttId, new Pair(address, serverMessageId));
+ if (qos == 2)
+ {
+ if (reverseOutboundReferenceStore.containsKey(address))
+ {
+ reverseOutboundReferenceStore.get(address).put(serverMessageId, mqttId);
+ }
+ else
+ {
+ ConcurrentHashMap serverToMqttId = new ConcurrentHashMap();
+ serverToMqttId.put(serverMessageId, mqttId);
+ reverseOutboundReferenceStore.put(address, serverToMqttId);
+ }
+ }
+ }
+ }
+
+ Pair removeOutbandMessageRef(int mqttId, int qos)
+ {
+ synchronized (outboundLock)
+ {
+ Pair messageInfo = outboundMessageReferenceStore.remove(mqttId);
+ if (qos == 1)
+ {
+ return messageInfo;
+ }
+
+ Map map = reverseOutboundReferenceStore.get(messageInfo.getA());
+ if (map != null)
+ {
+ map.remove(messageInfo.getB());
+ if (map.isEmpty())
+ {
+ reverseOutboundReferenceStore.remove(messageInfo.getA());
+ }
+ return messageInfo;
+ }
+ return null;
+ }
+ }
+
+ Set getPubRec()
+ {
+ return pubRec;
+ }
+
+ Set getPub()
+ {
+ return pub;
+ }
+
+ boolean getAttached()
+ {
+ return attached;
+ }
+
+ void setAttached(boolean attached)
+ {
+ this.attached = attached;
+ }
+
+ boolean isWill()
+ {
+ return willMessage != null;
+ }
+
+ ServerMessage getWillMessage()
+ {
+ return willMessage;
+ }
+
+ void setWillMessage(ServerMessage willMessage)
+ {
+ this.willMessage = willMessage;
+ }
+
+ void deleteWillMessage()
+ {
+ willMessage = null;
+ }
+
+ Collection getSubscriptions()
+ {
+ return subscriptions.values();
+ }
+
+ boolean addSubscription(MqttTopicSubscription subscription)
+ {
+ synchronized (subscriptions)
+ {
+ addressMessageMap.putIfAbsent(MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName()), new ConcurrentHashMap());
+
+ MqttTopicSubscription existingSubscription = subscriptions.get(subscription.topicName());
+ if (existingSubscription != null)
+ {
+ if (subscription.qualityOfService().value() > existingSubscription.qualityOfService().value())
+ {
+ subscriptions.put(subscription.topicName(), subscription);
+ return true;
+ }
+ }
+ else
+ {
+ subscriptions.put(subscription.topicName(), subscription);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ void removeSubscription(String address)
+ {
+ synchronized (subscriptions)
+ {
+ subscriptions.remove(address);
+ addressMessageMap.remove(address);
+ }
+ }
+
+ MqttTopicSubscription getSubscription(String address)
+ {
+ return subscriptions.get(address);
+ }
+
+ String getClientId()
+ {
+ return clientId;
+ }
+
+ void setClientId(String clientId)
+ {
+ this.clientId = clientId;
+ }
+
+ void storeMessageRef(Integer mqttId, MQTTMessageInfo messageInfo, boolean storeAddress)
+ {
+ messageRefStore.put(mqttId, messageInfo);
+ if (storeAddress)
+ {
+ Map addressMap = addressMessageMap.get(messageInfo.getAddress());
+ if (addressMap != null)
+ {
+ addressMap.put(messageInfo.getServerMessageId(), mqttId);
+ }
+ }
+ }
+
+ void removeMessageRef(Integer mqttId)
+ {
+ MQTTMessageInfo info = messageRefStore.remove(mqttId);
+ if (info != null)
+ {
+ Map addressMap = addressMessageMap.get(info.getAddress());
+ if (addressMap != null)
+ {
+ addressMap.remove(info.getServerMessageId());
+ }
+ }
+ }
+
+ MQTTMessageInfo getMessageInfo(Integer mqttId)
+ {
+ return messageRefStore.get(mqttId);
+ }
+}
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
new file mode 100644
index 0000000000..e7ac143270
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class MQTTSubscriptionManager
+{
+ private MQTTSession session;
+
+ private ConcurrentHashMap consumerQoSLevels;
+
+ private ConcurrentHashMap consumers;
+
+ private MQTTLogger log = MQTTLogger.LOGGER;
+
+ public MQTTSubscriptionManager(MQTTSession session)
+ {
+ this.session = session;
+
+ consumers = new ConcurrentHashMap<>();
+ consumerQoSLevels = new ConcurrentHashMap<>();
+ }
+
+ synchronized void start() throws Exception
+ {
+ for (MqttTopicSubscription subscription : session.getSessionState().getSubscriptions())
+ {
+ SimpleString q = createQueueForSubscription(subscription.topicName(), subscription.qualityOfService().value());
+ createConsumerForSubscriptionQueue(q, subscription.topicName(), subscription.qualityOfService().value());
+ }
+ }
+
+ synchronized void stop(boolean clean) throws Exception
+ {
+ for (ServerConsumer consumer : consumers.values())
+ {
+ consumer.setStarted(false);
+ consumer.disconnect();
+ consumer.getQueue().removeConsumer(consumer);
+ consumer.close(false);
+ }
+
+ if (clean)
+ {
+ for (ServerConsumer consumer : consumers.values())
+ {
+ session.getServer().destroyQueue(consumer.getQueue().getName());
+ }
+ }
+ }
+
+ /**
+ * Creates a Queue if it doesn't already exist, based on a topic and address. Returning the queue name.
+ */
+ private SimpleString createQueueForSubscription(String topic, int qos) throws Exception
+ {
+ String address = MQTTUtil.convertMQTTAddressFilterToCore(topic);
+ SimpleString queue = getQueueNameForTopic(address);
+
+ Queue q = session.getServer().locateQueue(queue);
+ if (q == null)
+ {
+ session.getServerSession().createQueue(new SimpleString(address), queue, null, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0);
+ }
+ return queue;
+ }
+
+ /**
+ * Creates a new consumer for the queue associated with a subscription
+ */
+ private void createConsumerForSubscriptionQueue(SimpleString queue, String topic, int qos) throws Exception
+ {
+ long cid = session.getServer().getStorageManager().generateID();
+
+ ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue, null, false, true, -1);
+ consumer.setStarted(true);
+
+ consumers.put(topic, consumer);
+ consumerQoSLevels.put(cid, qos);
+ }
+
+
+ private void addSubscription(MqttTopicSubscription subscription) throws Exception
+ {
+ MqttTopicSubscription s = session.getSessionState().getSubscription(subscription.topicName());
+
+ int qos = subscription.qualityOfService().value();
+ String topic = subscription.topicName();
+
+ session.getSessionState().addSubscription(subscription);
+
+ SimpleString q = createQueueForSubscription(topic, qos);
+
+ if (s == null)
+ {
+ createConsumerForSubscriptionQueue(q, topic, qos);
+ }
+ else
+ {
+ consumerQoSLevels.put(consumers.get(topic).getID(), qos);
+ }
+ session.getRetainMessageManager().addRetainedMessagesToQueue(q, topic);
+ }
+
+ void removeSubscriptions(List topics) throws Exception
+ {
+ for (String topic : topics)
+ {
+ removeSubscription(topic);
+ }
+ }
+
+ private synchronized void removeSubscription(String address) throws Exception
+ {
+ ServerConsumer consumer = consumers.get(address);
+ String internalAddress = MQTTUtil.convertMQTTAddressFilterToCore(address);
+ SimpleString internalQueueName = getQueueNameForTopic(internalAddress);
+
+ Queue queue = session.getServer().locateQueue(internalQueueName);
+ queue.deleteQueue(true);
+ session.getSessionState().removeSubscription(address);
+ consumers.remove(address);
+ consumerQoSLevels.remove(consumer.getID());
+ }
+
+ private SimpleString getQueueNameForTopic(String topic)
+ {
+ return new SimpleString(session.getSessionState().getClientId() + "." + topic);
+ }
+
+ /**
+ * As per MQTT Spec. Subscribes this client to a number of MQTT topics.
+ *
+ * @param subscriptions
+ * @return An array of integers representing the list of accepted QoS for each topic.
+ *
+ * @throws Exception
+ */
+ int[] addSubscriptions(List subscriptions) throws Exception
+ {
+ int[] qos = new int[subscriptions.size()];
+
+ for (int i = 0; i < subscriptions.size(); i++)
+ {
+ addSubscription(subscriptions.get(i));
+ qos[i] = subscriptions.get(i).qualityOfService().value();
+ }
+ return qos;
+ }
+
+ Map getConsumerQoSLevels()
+ {
+ return consumerQoSLevels;
+ }
+
+ ServerConsumer getConsumerForAddress(String address)
+ {
+ return consumers.get(address);
+ }
+}
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
new file mode 100644
index 0000000000..c6f1a65395
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
+import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+
+/**
+ * A Utility Class for creating Server Side objects and converting MQTT concepts to/from Artemis.
+ */
+
+public class MQTTUtil
+{
+ // TODO These settings should be configurable.
+ public static final int DEFAULT_SERVER_MESSAGE_BUFFER_SIZE = 512;
+
+ public static final boolean DURABLE_MESSAGES = true;
+
+ public static final boolean SESSION_AUTO_COMMIT_SENDS = true;
+
+ public static final boolean SESSION_AUTO_COMMIT_ACKS = false;
+
+ public static final boolean SESSION_PREACKNOWLEDGE = false;
+
+ public static final boolean SESSION_XA = false;
+
+ public static final boolean SESSION_AUTO_CREATE_QUEUE = false;
+
+ public static final int MAX_MESSAGE_SIZE = 268435455;
+
+ public static final String MQTT_ADDRESS_PREFIX = "$sys.mqtt.";
+
+ public static final String MQTT_RETAIN_ADDRESS_PREFIX = "$sys.mqtt.retain.";
+
+ public static final String MQTT_QOS_LEVEL_KEY = "mqtt.qos.level";
+
+ public static final String MQTT_MESSAGE_ID_KEY = "mqtt.message.id";
+
+ public static final String MQTT_MESSAGE_TYPE_KEY = "mqtt.message.type";
+
+ public static final String MQTT_MESSAGE_RETAIN_KEY = "mqtt.message.retain";
+
+ public static final int DEFAULT_KEEP_ALIVE_FREQUENCY = 5000;
+
+ public static String convertMQTTAddressFilterToCore(String filter)
+ {
+ return MQTT_ADDRESS_PREFIX + swapMQTTAndCoreWildCards(filter);
+ }
+
+ public static String convertCoreAddressFilterToMQTT(String filter)
+ {
+ if (filter.startsWith(MQTT_RETAIN_ADDRESS_PREFIX.toString()))
+ {
+ filter = filter.substring(MQTT_RETAIN_ADDRESS_PREFIX.length(), filter.length());
+ }
+ else if (filter.startsWith(MQTT_ADDRESS_PREFIX.toString()))
+ {
+ filter = filter.substring(MQTT_ADDRESS_PREFIX.length(), filter.length());
+ }
+ return swapMQTTAndCoreWildCards(filter);
+ }
+
+ public static String convertMQTTAddressFilterToCoreRetain(String filter)
+ {
+ return MQTT_RETAIN_ADDRESS_PREFIX + swapMQTTAndCoreWildCards(filter);
+ }
+
+ public static String swapMQTTAndCoreWildCards(String filter)
+ {
+ char[] topicFilter = filter.toCharArray();
+ for (int i = 0; i < topicFilter.length; i++)
+ {
+ switch (topicFilter[i])
+ {
+ case '/':
+ topicFilter[i] = '.'; break;
+ case '.':
+ topicFilter[i] = '/'; break;
+ case '*':
+ topicFilter[i] = '+'; break;
+ case '+':
+ topicFilter[i] = '*'; break;
+ default:
+ break;
+ }
+ }
+ return String.valueOf(topicFilter);
+ }
+
+ private static ServerMessage createServerMessage(MQTTSession session, SimpleString address, boolean retain, int qos)
+ {
+ long id = session.getServer().getStorageManager().generateID();
+
+ ServerMessageImpl message = new ServerMessageImpl(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE);
+ message.setAddress(address);
+ message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain);
+ message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
+ return message;
+ }
+
+ public static ServerMessage createServerMessageFromByteBuf(MQTTSession session, String topic, boolean retain, int qos, ByteBuf payload)
+ {
+ String coreAddress = convertMQTTAddressFilterToCore(topic);
+ ServerMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos);
+
+ // FIXME does this involve a copy?
+ message.getBodyBuffer().writeBytes(new ChannelBufferWrapper(payload), payload.readableBytes());
+ return message;
+ }
+
+ public static ServerMessage createServerMessageFromString(MQTTSession session, String payload, String topic, int qos, boolean retain)
+ {
+ ServerMessage message = createServerMessage(session, new SimpleString(topic), retain, qos);
+ message.getBodyBuffer().writeString(payload);
+ return message;
+ }
+
+ public static ServerMessage createPubRelMessage(MQTTSession session, SimpleString address, int messageId)
+ {
+ ServerMessage message = createServerMessage(session, address, false, 1);
+ message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_ID_KEY), messageId);
+ message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_TYPE_KEY), MqttMessageType.PUBREL.value());
+ return message;
+ }
+ public static void logMessage(MQTTLogger logger, MqttMessage message, boolean inbound)
+ {
+ StringBuilder log = inbound ? new StringBuilder("Received ") : new StringBuilder("Sent ");
+
+ if (message.fixedHeader() != null)
+ {
+ log.append(message.fixedHeader().messageType().toString());
+
+ if (message.variableHeader() instanceof MqttPublishVariableHeader)
+ {
+ log.append("(" + ((MqttPublishVariableHeader) message.variableHeader()).messageId() + ") " + message.fixedHeader().qosLevel());
+ }
+ else if (message.variableHeader() instanceof MqttMessageIdVariableHeader)
+ {
+ log.append("(" + ((MqttMessageIdVariableHeader) message.variableHeader()).messageId() + ")");
+ }
+
+ if (message.fixedHeader().messageType() == MqttMessageType.SUBSCRIBE)
+ {
+ for (MqttTopicSubscription sub : ((MqttSubscribeMessage) message).payload().topicSubscriptions())
+ {
+ log.append("\n\t" + sub.topicName() + " : " + sub.qualityOfService());
+ }
+ }
+
+ logger.debug(log.toString());
+ }
+ }
+
+}
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory b/artemis-protocols/artemis-mqtt-protocol/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory
new file mode 100644
index 0000000000..2c1103f1eb
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory
@@ -0,0 +1 @@
+org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManagerFactory
\ No newline at end of file
diff --git a/artemis-protocols/pom.xml b/artemis-protocols/pom.xml
index d052e435f0..5ef449bd6b 100644
--- a/artemis-protocols/pom.xml
+++ b/artemis-protocols/pom.xml
@@ -36,6 +36,7 @@
artemis-openwire-protocol
artemis-proton-plug
artemis-hornetq-protocol
+ artemis-mqtt-protocol
diff --git a/pom.xml b/pom.xml
index 93e0efe53b..28d48efb6c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -166,6 +166,23 @@
+
+
+
+ org.fusesource.mqtt-client
+ mqtt-client
+ 1.10
+ test
+
+
+
+ org.eclipse.paho
+ mqtt-client
+ 0.4.1-SNAPSHOT
+ test
+
+
+
@@ -301,6 +318,12 @@
${netty.version}
+
+ io.netty
+ netty-codec-mqtt
+ 5.0.0.Alpha2
+
+
org.apache.qpid
proton-j
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index 8d580d92e2..3e90194ae6 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -32,6 +32,17 @@
2.0.3-final
+
+
+
+ eclipse.m2
+ https://repo.eclipse.org/content/groups/snapshots/
+ false
+ true
+
+
+
+
org.apache.activemq
@@ -121,6 +132,32 @@
artemis-openwire-protocol
${project.version}
+
+ org.apache.activemq
+ artemis-hornetq-protocol
+ ${project.version}
+
+
+
+
+ org.apache.activemq
+ artemis-mqtt-protocol
+ ${project.version}
+
+
+ org.fusesource.mqtt-client
+ mqtt-client
+
+
+ org.eclipse.paho
+ mqtt-client
+
+
+ io.netty
+ netty-codec-mqtt
+
+
+
org.apache.activemq
artemis-aerogear-integration
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/FuseMQTTClientProvider.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/FuseMQTTClientProvider.java
new file mode 100644
index 0000000000..f5dbe30e04
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/FuseMQTTClientProvider.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+
+import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
+
+public class FuseMQTTClientProvider implements MQTTClientProvider
+{
+ private final MQTT mqtt = new MQTT();
+ private BlockingConnection connection;
+
+ @Override
+ public void connect(String host) throws Exception
+ {
+ mqtt.setHost(host);
+ mqtt.setVersion("3.1.1");
+ // shut off connect retry
+ mqtt.setConnectAttemptsMax(0);
+ mqtt.setReconnectAttemptsMax(0);
+ connection = mqtt.blockingConnection();
+
+ connection.connect();
+ }
+
+ @Override
+ public void disconnect() throws Exception
+ {
+ if (this.connection != null)
+ {
+ this.connection.disconnect();
+ }
+ }
+
+ @Override
+ public void publish(String topic, byte[] payload, int qos) throws Exception
+ {
+ publish(topic, payload, qos, false);
+ }
+
+ @Override
+ public void publish(String topic, byte[] payload, int qos, boolean retained) throws Exception
+ {
+ connection.publish(topic, payload, QoS.values()[qos], retained);
+ }
+
+ @Override
+ public void subscribe(String topic, int qos) throws Exception
+ {
+ Topic[] topics = {new Topic(utf8(topic), QoS.values()[qos])};
+ connection.subscribe(topics);
+ }
+
+ @Override
+ public void unsubscribe(String topic) throws Exception
+ {
+ connection.unsubscribe(new String[]{topic});
+ }
+
+ @Override
+ public byte[] receive(int timeout) throws Exception
+ {
+ byte[] result = null;
+ Message message = connection.receive(timeout, TimeUnit.MILLISECONDS);
+ if (message != null)
+ {
+ result = message.getPayload();
+ message.ack();
+ }
+ return result;
+ }
+
+ @Override
+ public void setSslContext(SSLContext sslContext)
+ {
+ mqtt.setSslContext(sslContext);
+ }
+
+ @Override
+ public void setWillMessage(String string)
+ {
+ mqtt.setWillMessage(string);
+ }
+
+ @Override
+ public void setWillTopic(String topic)
+ {
+ mqtt.setWillTopic(topic);
+ }
+
+ @Override
+ public void setClientId(String clientId)
+ {
+ mqtt.setClientId(clientId);
+ }
+
+ @Override
+ public void kill() throws Exception
+ {
+ connection.kill();
+ }
+
+ @Override
+ public void setKeepAlive(int keepAlive) throws Exception
+ {
+ mqtt.setKeepAlive((short) keepAlive);
+ }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTClientProvider.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTClientProvider.java
new file mode 100644
index 0000000000..f26bfb8b88
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTClientProvider.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+
+public interface MQTTClientProvider
+{
+ void connect(String host) throws Exception;
+
+ void disconnect() throws Exception;
+
+ void publish(String topic, byte[] payload, int qos, boolean retained) throws Exception;
+
+ void publish(String topic, byte[] payload, int qos) throws Exception;
+
+ void subscribe(String topic, int qos) throws Exception;
+
+ void unsubscribe(String topic) throws Exception;
+
+ byte[] receive(int timeout) throws Exception;
+
+ void setSslContext(javax.net.ssl.SSLContext sslContext);
+
+ void setWillMessage(String string);
+
+ void setWillTopic(String topic);
+
+ void setClientId(String clientId);
+
+ void kill() throws Exception;
+
+ void setKeepAlive(int keepAlive) throws Exception;
+
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
new file mode 100644
index 0000000000..2a790e8ce4
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -0,0 +1,1747 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+
+import java.lang.reflect.Field;
+import java.net.ProtocolException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
+import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.fusesource.mqtt.client.Tracer;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.codec.PUBLISH;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.vertx.java.core.impl.ConcurrentHashSet;
+
+/**
+ * MQTT Test imported from ActiveMQ MQTT component.
+ */
+public class MQTTTest extends MQTTTestSupport
+{
+
+ private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
+
+ private static final int NUM_MESSAGES = 250;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ Field sessions = MQTTSession.class.getDeclaredField("SESSIONS");
+ sessions.setAccessible(true);
+ sessions.set(null, new ConcurrentHashMap<>());
+
+ Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
+ connectedClients.setAccessible(true);
+ connectedClients.set(null, new ConcurrentHashSet<>());
+ super.setUp();
+
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testSendAndReceiveMQTT() throws Exception
+ {
+ final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
+ initializeConnection(subscriptionProvider);
+
+ subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
+
+ final CountDownLatch latch = new CountDownLatch(NUM_MESSAGES);
+
+ Thread thread = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ try
+ {
+ byte[] payload = subscriptionProvider.receive(10000);
+ assertNotNull("Should get a message", payload);
+ latch.countDown();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ break;
+ }
+
+ }
+ }
+ });
+ thread.start();
+
+ final MQTTClientProvider publishProvider = getMQTTClientProvider();
+ initializeConnection(publishProvider);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ String payload = "Message " + i;
+ publishProvider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
+ }
+
+ latch.await(10, TimeUnit.SECONDS);
+ assertEquals(0, latch.getCount());
+ subscriptionProvider.disconnect();
+ publishProvider.disconnect();
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testUnsubscribeMQTT() throws Exception
+ {
+ final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
+ initializeConnection(subscriptionProvider);
+
+ String topic = "foo/bah";
+
+ subscriptionProvider.subscribe(topic, AT_MOST_ONCE);
+
+ final CountDownLatch latch = new CountDownLatch(NUM_MESSAGES / 2);
+
+ Thread thread = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ try
+ {
+ byte[] payload = subscriptionProvider.receive(10000);
+ assertNotNull("Should get a message", payload);
+ latch.countDown();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ break;
+ }
+
+ }
+ }
+ });
+ thread.start();
+
+ final MQTTClientProvider publishProvider = getMQTTClientProvider();
+ initializeConnection(publishProvider);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ String payload = "Message " + i;
+ if (i == NUM_MESSAGES / 2)
+ {
+ subscriptionProvider.unsubscribe(topic);
+ }
+ publishProvider.publish(topic, payload.getBytes(), AT_LEAST_ONCE);
+ }
+
+ latch.await(20, TimeUnit.SECONDS);
+ assertEquals(0, latch.getCount());
+ subscriptionProvider.disconnect();
+ publishProvider.disconnect();
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testSendAtMostOnceReceiveExactlyOnce() throws Exception
+ {
+ /**
+ * Although subscribing with EXACTLY ONCE, the message gets published
+ * with AT_MOST_ONCE - in MQTT the QoS is always determined by the
+ * message as published - not the wish of the subscriber
+ */
+ final MQTTClientProvider provider = getMQTTClientProvider();
+ initializeConnection(provider);
+ provider.subscribe("foo", EXACTLY_ONCE);
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ String payload = "Test Message: " + i;
+ provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
+ byte[] message = provider.receive(5000);
+ assertNotNull("Should get a message", message);
+ assertEquals(payload, new String(message));
+ }
+ provider.disconnect();
+ }
+
+ @Test(timeout = 2 * 60 * 1000)
+ public void testSendAtLeastOnceReceiveExactlyOnce() throws Exception
+ {
+ final MQTTClientProvider provider = getMQTTClientProvider();
+ initializeConnection(provider);
+ provider.subscribe("foo", EXACTLY_ONCE);
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ String payload = "Test Message: " + i;
+ provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
+ byte[] message = provider.receive(5000);
+ assertNotNull("Should get a message", message);
+ assertEquals(payload, new String(message));
+ }
+ provider.disconnect();
+ }
+
+ @Test(timeout = 2 * 60 * 1000)
+ public void testSendAtLeastOnceReceiveAtMostOnce() throws Exception
+ {
+ final MQTTClientProvider provider = getMQTTClientProvider();
+ initializeConnection(provider);
+ provider.subscribe("foo", AT_MOST_ONCE);
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ String payload = "Test Message: " + i;
+ provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
+ byte[] message = provider.receive(5000);
+ assertNotNull("Should get a message", message);
+ assertEquals(payload, new String(message));
+ }
+ provider.disconnect();
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testSendAndReceiveAtMostOnce() throws Exception
+ {
+ final MQTTClientProvider provider = getMQTTClientProvider();
+ initializeConnection(provider);
+ provider.subscribe("foo", AT_MOST_ONCE);
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ String payload = "Test Message: " + i;
+ provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
+ byte[] message = provider.receive(5000);
+ assertNotNull("Should get a message", message);
+ assertEquals(payload, new String(message));
+ }
+ provider.disconnect();
+ }
+
+ @Test(timeout = 2 * 60 * 1000)
+ public void testSendAndReceiveAtLeastOnce() throws Exception
+ {
+ final MQTTClientProvider provider = getMQTTClientProvider();
+ initializeConnection(provider);
+ provider.subscribe("foo", AT_LEAST_ONCE);
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ String payload = "Test Message: " + i;
+ provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
+ byte[] message = provider.receive(5000);
+ assertNotNull("Should get a message", message);
+ assertEquals(payload, new String(message));
+ }
+ provider.disconnect();
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testSendAndReceiveExactlyOnce() throws Exception
+ {
+ final MQTTClientProvider publisher = getMQTTClientProvider();
+ initializeConnection(publisher);
+
+ final MQTTClientProvider subscriber = getMQTTClientProvider();
+ initializeConnection(subscriber);
+
+ subscriber.subscribe("foo", EXACTLY_ONCE);
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ String payload = "Test Message: " + i;
+ publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE);
+ byte[] message = subscriber.receive(5000);
+ assertNotNull("Should get a message + [" + i + "]", message);
+ assertEquals(payload, new String(message));
+ }
+ subscriber.disconnect();
+ publisher.disconnect();
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testSendAndReceiveLargeMessages() throws Exception
+ {
+ byte[] payload = new byte[1024 * 32];
+ for (int i = 0; i < payload.length; i++)
+ {
+ payload[i] = '2';
+ }
+ final MQTTClientProvider publisher = getMQTTClientProvider();
+ initializeConnection(publisher);
+
+ final MQTTClientProvider subscriber = getMQTTClientProvider();
+ initializeConnection(subscriber);
+
+ subscriber.subscribe("foo", AT_LEAST_ONCE);
+ for (int i = 0; i < 10; i++)
+ {
+ publisher.publish("foo", payload, AT_LEAST_ONCE);
+ byte[] message = subscriber.receive(5000);
+ assertNotNull("Should get a message", message);
+
+ assertArrayEquals(payload, message);
+ }
+ subscriber.disconnect();
+ publisher.disconnect();
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testSendAndReceiveRetainedMessages() throws Exception
+ {
+ final MQTTClientProvider publisher = getMQTTClientProvider();
+ initializeConnection(publisher);
+
+ final MQTTClientProvider subscriber = getMQTTClientProvider();
+ initializeConnection(subscriber);
+
+ String RETAINED = "retained";
+ publisher.publish("foo", RETAINED.getBytes(), AT_LEAST_ONCE, true);
+
+ List messages = new ArrayList();
+ for (int i = 0; i < 10; i++)
+ {
+ messages.add("TEST MESSAGE:" + i);
+ }
+
+ subscriber.subscribe("foo", AT_LEAST_ONCE);
+
+ for (int i = 0; i < 10; i++)
+ {
+ publisher.publish("foo", messages.get(i).getBytes(), AT_LEAST_ONCE);
+ }
+ byte[] msg = subscriber.receive(5000);
+ assertNotNull(msg);
+ assertEquals(RETAINED, new String(msg));
+
+ for (int i = 0; i < 10; i++)
+ {
+ msg = subscriber.receive(5000);
+ assertNotNull(msg);
+ assertEquals(messages.get(i), new String(msg));
+ }
+ subscriber.disconnect();
+ publisher.disconnect();
+ }
+
+ @Test(timeout = 30 * 1000)
+ public void testValidZeroLengthClientId() throws Exception
+ {
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("");
+ mqtt.setCleanSession(true);
+
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ connection.disconnect();
+ }
+
+ @Test(timeout = 2 * 60 * 1000)
+ public void testMQTTPathPatterns() throws Exception
+ {
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("");
+ mqtt.setCleanSession(true);
+
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ final String RETAINED = "RETAINED";
+ String[] topics = {"TopicA", "/TopicA", "/", "TopicA/", "//"};
+ for (String topic : topics)
+ {
+ // test retained message
+ connection.publish(topic, (RETAINED + topic).getBytes(), QoS.AT_LEAST_ONCE, true);
+
+ connection.subscribe(new Topic[]{new Topic(topic, QoS.AT_LEAST_ONCE)});
+ Message msg = connection.receive(5, TimeUnit.SECONDS);
+ assertNotNull("No message for " + topic, msg);
+ assertEquals(RETAINED + topic, new String(msg.getPayload()));
+ msg.ack();
+
+ // test non-retained message
+ connection.publish(topic, topic.getBytes(), QoS.AT_LEAST_ONCE, false);
+ msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+ assertNotNull(msg);
+ assertEquals(topic, new String(msg.getPayload()));
+ msg.ack();
+
+ connection.unsubscribe(new String[]{topic});
+ }
+ connection.disconnect();
+
+ // test wildcard patterns with above topics
+ String[] wildcards = {"#", "+", "+/#", "/+", "+/", "+/+", "+/+/", "+/+/+"};
+ for (String wildcard : wildcards)
+ {
+ final Pattern pattern = Pattern.compile(wildcard.replaceAll("/?#", "(/?.*)*").replaceAll("\\+", "[^/]*"));
+
+ connection = mqtt.blockingConnection();
+ connection.connect();
+ final byte[] qos = connection.subscribe(new Topic[]{new Topic(wildcard, QoS.AT_LEAST_ONCE)});
+ assertNotEquals("Subscribe failed " + wildcard, (byte) 0x80, qos[0]);
+
+ // test retained messages
+ Message msg = connection.receive(5, TimeUnit.SECONDS);
+ do
+ {
+ assertNotNull("RETAINED null " + wildcard, msg);
+ assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED));
+ assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(), pattern.matcher(msg.getTopic()).matches());
+ msg.ack();
+ msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+ } while (msg != null);
+
+ // test non-retained message
+ for (String topic : topics)
+ {
+ connection.publish(topic, topic.getBytes(), QoS.AT_LEAST_ONCE, false);
+ }
+ msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+ do
+ {
+ assertNotNull("Non-retained Null " + wildcard, msg);
+ assertTrue("Non-retained matching " + wildcard + " " + msg.getTopic(), pattern.matcher(msg.getTopic()).matches());
+ msg.ack();
+ msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+ } while (msg != null);
+
+ connection.unsubscribe(new String[]{wildcard});
+ connection.disconnect();
+ }
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testMQTTRetainQoS() throws Exception
+ {
+ String[] topics = {"AT_MOST_ONCE", "AT_LEAST_ONCE", "EXACTLY_ONCE"};
+ for (int i = 0; i < topics.length; i++)
+ {
+ final String topic = topics[i];
+
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("foo");
+ mqtt.setKeepAlive((short) 2);
+
+ final int[] actualQoS = {-1};
+ mqtt.setTracer(new Tracer()
+ {
+ @Override
+ public void onReceive(MQTTFrame frame)
+ {
+ // validate the QoS
+ if (frame.messageType() == PUBLISH.TYPE)
+ {
+ actualQoS[0] = frame.qos().ordinal();
+ }
+ }
+ });
+
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ connection.publish(topic, topic.getBytes(), QoS.EXACTLY_ONCE, true);
+ connection.subscribe(new Topic[]{new Topic(topic, QoS.valueOf(topic))});
+
+ final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+ assertNotNull(msg);
+ assertEquals(topic, new String(msg.getPayload()));
+ int waitCount = 0;
+ while (actualQoS[0] == -1 && waitCount < 10)
+ {
+ Thread.sleep(1000);
+ waitCount++;
+ }
+ assertEquals(i, actualQoS[0]);
+ msg.ack();
+
+ connection.unsubscribe(new String[]{topic});
+ connection.disconnect();
+ }
+
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testDuplicateSubscriptions() throws Exception
+ {
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("foo");
+ mqtt.setKeepAlive((short) 20);
+
+ final int[] actualQoS = {-1};
+ mqtt.setTracer(new Tracer()
+ {
+ @Override
+ public void onReceive(MQTTFrame frame)
+ {
+ // validate the QoS
+ if (frame.messageType() == PUBLISH.TYPE)
+ {
+ actualQoS[0] = frame.qos().ordinal();
+ }
+ }
+ });
+
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ final String RETAIN = "RETAIN";
+ connection.publish("TopicA", RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
+
+ QoS[] qoss = {QoS.AT_MOST_ONCE, QoS.AT_MOST_ONCE, QoS.AT_LEAST_ONCE, QoS.EXACTLY_ONCE};
+ for (QoS qos : qoss)
+ {
+ connection.subscribe(new Topic[]{new Topic("TopicA", qos)});
+
+ final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+ assertNotNull("No message for " + qos, msg);
+ assertEquals(RETAIN, new String(msg.getPayload()));
+ msg.ack();
+ int waitCount = 0;
+ while (actualQoS[0] == -1 && waitCount < 10)
+ {
+ Thread.sleep(1000);
+ waitCount++;
+ }
+ assertEquals(qos.ordinal(), actualQoS[0]);
+ actualQoS[0] = -1;
+ }
+
+ connection.unsubscribe(new String[]{"TopicA"});
+ connection.disconnect();
+
+ }
+
+ @Test(timeout = 120 * 1000)
+ public void testRetainedMessage() throws Exception
+ {
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setKeepAlive((short) 60);
+
+ final String RETAIN = "RETAIN";
+ final String TOPICA = "TopicA";
+
+ final String[] clientIds = {null, "foo", "durable"};
+ for (String clientId : clientIds)
+ {
+ LOG.info("Testing now with Client ID: {}", clientId);
+
+ mqtt.setClientId(clientId);
+ mqtt.setCleanSession(!"durable".equals(clientId));
+
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ // set retained message and check
+ connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
+ connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+ Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+ assertNotNull("No retained message for " + clientId, msg);
+ assertEquals(RETAIN, new String(msg.getPayload()));
+ msg.ack();
+ assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+
+ // test duplicate subscription
+ connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+ msg = connection.receive(15000, TimeUnit.MILLISECONDS);
+ assertNotNull("No retained message on duplicate subscription for " + clientId, msg);
+ assertEquals(RETAIN, new String(msg.getPayload()));
+ msg.ack();
+ assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+ connection.unsubscribe(new String[]{TOPICA});
+
+ // clear retained message and check that we don't receive it
+ connection.publish(TOPICA, "".getBytes(), QoS.AT_MOST_ONCE, true);
+ connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+ msg = connection.receive(500, TimeUnit.MILLISECONDS);
+ assertNull("Retained message not cleared for " + clientId, msg);
+ connection.unsubscribe(new String[]{TOPICA});
+
+ // set retained message again and check
+ connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
+ connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+ msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+ assertNotNull("No reset retained message for " + clientId, msg);
+ assertEquals(RETAIN, new String(msg.getPayload()));
+ msg.ack();
+ assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+
+ // re-connect and check
+ connection.disconnect();
+ connection = mqtt.blockingConnection();
+ connection.connect();
+ connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+ msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+ assertNotNull("No reset retained message for " + clientId, msg);
+ assertEquals(RETAIN, new String(msg.getPayload()));
+ msg.ack();
+ assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+
+ connection.unsubscribe(new String[]{TOPICA});
+ connection.disconnect();
+ }
+ }
+
+ @Ignore
+ @Test(timeout = 120 * 1000)
+ public void testRetainedMessageOnVirtualTopics() throws Exception
+ {
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setKeepAlive((short) 60);
+
+ final String RETAIN = "RETAIN";
+ final String TOPICA = "VirtualTopic/TopicA";
+
+ final String[] clientIds = {null, "foo", "durable"};
+ for (String clientId : clientIds)
+ {
+ LOG.info("Testing now with Client ID: {}", clientId);
+
+ mqtt.setClientId(clientId);
+ mqtt.setCleanSession(!"durable".equals(clientId));
+
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ // set retained message and check
+ connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
+ connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+ Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+ assertNotNull("No retained message for " + clientId, msg);
+ assertEquals(RETAIN, new String(msg.getPayload()));
+ msg.ack();
+ assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+
+ // test duplicate subscription
+ connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+ msg = connection.receive(15000, TimeUnit.MILLISECONDS);
+ assertNotNull("No retained message on duplicate subscription for " + clientId, msg);
+ assertEquals(RETAIN, new String(msg.getPayload()));
+ msg.ack();
+ assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+ connection.unsubscribe(new String[]{TOPICA});
+
+ // clear retained message and check that we don't receive it
+ connection.publish(TOPICA, "".getBytes(), QoS.AT_MOST_ONCE, true);
+ connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+ msg = connection.receive(500, TimeUnit.MILLISECONDS);
+ assertNull("Retained message not cleared for " + clientId, msg);
+ connection.unsubscribe(new String[]{TOPICA});
+
+ // set retained message again and check
+ connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
+ connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+ msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+ assertNotNull("No reset retained message for " + clientId, msg);
+ assertEquals(RETAIN, new String(msg.getPayload()));
+ msg.ack();
+ assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+
+ // re-connect and check
+ connection.disconnect();
+ connection = mqtt.blockingConnection();
+ connection.connect();
+ connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+ msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+ assertNotNull("No reset retained message for " + clientId, msg);
+ assertEquals(RETAIN, new String(msg.getPayload()));
+ msg.ack();
+ assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+
+ LOG.info("Test now unsubscribing from: {} for the last time", TOPICA);
+ connection.unsubscribe(new String[]{TOPICA});
+ connection.disconnect();
+ }
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testUniqueMessageIds() throws Exception
+ {
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("foo");
+ mqtt.setKeepAlive((short) 2);
+ mqtt.setCleanSession(true);
+
+ final List publishList = new ArrayList();
+ mqtt.setTracer(new Tracer()
+ {
+ @Override
+ public void onReceive(MQTTFrame frame)
+ {
+ LOG.info("Client received:\n" + frame);
+ if (frame.messageType() == PUBLISH.TYPE)
+ {
+ PUBLISH publish = new PUBLISH();
+ try
+ {
+ publish.decode(frame);
+ }
+ catch (ProtocolException e)
+ {
+ fail("Error decoding publish " + e.getMessage());
+ }
+ publishList.add(publish);
+ }
+ }
+
+ @Override
+ public void onSend(MQTTFrame frame)
+ {
+ LOG.info("Client sent:\n" + frame);
+ }
+ });
+
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ // create overlapping subscriptions with different QoSs
+ QoS[] qoss = {QoS.AT_MOST_ONCE, QoS.AT_LEAST_ONCE, QoS.EXACTLY_ONCE};
+ final String TOPIC = "TopicA/";
+
+ // publish retained message
+ connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, true);
+
+ String[] subs = {TOPIC, "TopicA/#", "TopicA/+"};
+ for (int i = 0; i < qoss.length; i++)
+ {
+ connection.subscribe(new Topic[]{new Topic(subs[i], qoss[i])});
+ }
+
+ // publish non-retained message
+ connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+ int received = 0;
+
+ Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+ do
+ {
+ assertNotNull(msg);
+ assertEquals(TOPIC, new String(msg.getPayload()));
+ msg.ack();
+ int waitCount = 0;
+ while (publishList.size() <= received && waitCount < 10)
+ {
+ Thread.sleep(1000);
+ waitCount++;
+ }
+ msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+ } while (msg != null && received++ < subs.length * 2);
+ assertEquals("Unexpected number of messages", subs.length * 2, received + 1);
+
+ // make sure we received distinct ids for QoS != AT_MOST_ONCE, and 0 for
+ // AT_MOST_ONCE
+ for (int i = 0; i < publishList.size(); i++)
+ {
+ for (int j = i + 1; j < publishList.size(); j++)
+ {
+ final PUBLISH publish1 = publishList.get(i);
+ final PUBLISH publish2 = publishList.get(j);
+ boolean qos0 = false;
+ if (publish1.qos() == QoS.AT_MOST_ONCE)
+ {
+ qos0 = true;
+ assertEquals(0, publish1.messageId());
+ }
+ if (publish2.qos() == QoS.AT_MOST_ONCE)
+ {
+ qos0 = true;
+ assertEquals(0, publish2.messageId());
+ }
+ if (!qos0)
+ {
+ assertNotEquals(publish1.messageId(), publish2.messageId());
+ }
+ }
+ }
+
+ connection.unsubscribe(subs);
+ connection.disconnect();
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testResendMessageId() throws Exception
+ {
+ final MQTT mqtt = createMQTTConnection("resend", false);
+ mqtt.setKeepAlive((short) 5);
+
+ final List publishList = new ArrayList();
+ mqtt.setTracer(new Tracer()
+ {
+ @Override
+ public void onReceive(MQTTFrame frame)
+ {
+ LOG.info("Client received:\n" + frame);
+ if (frame.messageType() == PUBLISH.TYPE)
+ {
+ PUBLISH publish = new PUBLISH();
+ try
+ {
+ publish.decode(frame);
+ }
+ catch (ProtocolException e)
+ {
+ fail("Error decoding publish " + e.getMessage());
+ }
+ publishList.add(publish);
+ }
+ }
+
+ @Override
+ public void onSend(MQTTFrame frame)
+ {
+ LOG.info("Client sent:\n" + frame);
+ }
+ });
+
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ final String TOPIC = "TopicA/";
+ final String[] topics = new String[]{TOPIC, "TopicA/+"};
+ connection.subscribe(new Topic[]{new Topic(topics[0], QoS.AT_LEAST_ONCE), new Topic(topics[1], QoS.EXACTLY_ONCE)});
+
+ // publish non-retained message
+ connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+
+ Wait.waitFor(new Wait.Condition()
+ {
+ @Override
+ public boolean isSatisified() throws Exception
+ {
+ return publishList.size() == 2;
+ }
+ }, 5000);
+ assertEquals(2, publishList.size());
+
+ connection.disconnect();
+
+ connection = mqtt.blockingConnection();
+ connection.connect();
+
+ Wait.waitFor(new Wait.Condition()
+ {
+ @Override
+ public boolean isSatisified() throws Exception
+ {
+ return publishList.size() == 4;
+ }
+ }, 5000);
+ assertEquals(4, publishList.size());
+
+ // TODO Investigate if receiving the same ID for overlapping subscriptions is actually spec compliant.
+ // In Artemis we send a new ID for every copy of the message.
+
+ // make sure we received duplicate message ids
+// assertTrue(publishList.get(0).messageId() == publishList.get(2).messageId() || publishList.get(0).messageId() == publishList.get(3).messageId());
+// assertTrue(publishList.get(1).messageId() == publishList.get(3).messageId() || publishList.get(1).messageId() == publishList.get(2).messageId());
+// assertTrue(publishList.get(2).dup() && publishList.get(3).dup());
+
+ connection.unsubscribe(topics);
+ connection.disconnect();
+ }
+
+ @Test(timeout = 90 * 1000)
+ public void testPacketIdGeneratorNonCleanSession() throws Exception
+ {
+ final MQTT mqtt = createMQTTConnection("nonclean-packetid", false);
+ mqtt.setKeepAlive((short) 15);
+
+ final Map publishMap = new ConcurrentHashMap();
+ mqtt.setTracer(new Tracer()
+ {
+ @Override
+ public void onReceive(MQTTFrame frame)
+ {
+ LOG.info("Client received:\n" + frame);
+ if (frame.messageType() == PUBLISH.TYPE)
+ {
+ PUBLISH publish = new PUBLISH();
+ try
+ {
+ publish.decode(frame);
+ LOG.info("PUBLISH " + publish);
+ }
+ catch (ProtocolException e)
+ {
+ fail("Error decoding publish " + e.getMessage());
+ }
+ if (publishMap.get(publish.messageId()) != null)
+ {
+ assertTrue(publish.dup());
+ }
+ publishMap.put(publish.messageId(), publish);
+ }
+ }
+
+ @Override
+ public void onSend(MQTTFrame frame)
+ {
+ LOG.info("Client sent:\n" + frame);
+ }
+ });
+
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ final String TOPIC = "TopicA/";
+ connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
+
+ // publish non-retained messages
+ final int TOTAL_MESSAGES = 10;
+ for (int i = 0; i < TOTAL_MESSAGES; i++)
+ {
+ connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+ }
+
+ // receive half the messages in this session
+ for (int i = 0; i < TOTAL_MESSAGES / 2; i++)
+ {
+ final Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+ assertNotNull(msg);
+ assertEquals(TOPIC, new String(msg.getPayload()));
+ msg.ack();
+ }
+
+ connection.disconnect();
+ // resume session
+ connection = mqtt.blockingConnection();
+ connection.connect();
+ // receive rest of the messages
+ Message msg = null;
+ do
+ {
+ msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+ if (msg != null)
+ {
+ assertEquals(TOPIC, new String(msg.getPayload()));
+ msg.ack();
+ }
+ } while (msg != null);
+
+ // make sure we received all message ids
+ for (short id = 1; id <= TOTAL_MESSAGES; id++)
+ {
+ assertNotNull("No message for id " + id, publishMap.get(id));
+ }
+
+ connection.unsubscribe(new String[]{TOPIC});
+ connection.disconnect();
+ }
+
+ @Ignore
+ @Test(timeout = 90 * 1000)
+ // TODO ActiveMQ 5.x does not reset the message id generator even after a clean session. In Artemis we always reset.
+ // If there is a good reason for this we should follow ActiveMQ.
+ public void testPacketIdGeneratorCleanSession() throws Exception
+ {
+ final String[] cleanClientIds = new String[]{"", "clean-packetid", null};
+ final Map publishMap = new ConcurrentHashMap();
+ MQTT[] mqtts = new MQTT[cleanClientIds.length];
+ for (int i = 0; i < cleanClientIds.length; i++)
+ {
+ mqtts[i] = createMQTTConnection("", true);
+ mqtts[i].setKeepAlive((short) 15);
+
+ mqtts[i].setTracer(new Tracer()
+ {
+ @Override
+ public void onReceive(MQTTFrame frame)
+ {
+ LOG.info("Client received:\n" + frame);
+ if (frame.messageType() == PUBLISH.TYPE)
+ {
+ PUBLISH publish = new PUBLISH();
+ try
+ {
+ publish.decode(frame);
+ LOG.info("PUBLISH " + publish);
+ }
+ catch (ProtocolException e)
+ {
+ fail("Error decoding publish " + e.getMessage());
+ }
+ if (publishMap.get(publish.messageId()) != null)
+ {
+ assertTrue(publish.dup());
+ }
+ publishMap.put(publish.messageId(), publish);
+ }
+ }
+
+ @Override
+ public void onSend(MQTTFrame frame)
+ {
+ LOG.info("Client sent:\n" + frame);
+ }
+ });
+ }
+
+ final Random random = new Random();
+ for (short i = 0; i < 10; i++)
+ {
+ BlockingConnection connection = mqtts[random.nextInt(cleanClientIds.length)].blockingConnection();
+ connection.connect();
+ final String TOPIC = "TopicA/";
+ connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
+
+ // publish non-retained message
+ connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+ Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+ assertNotNull(msg);
+ assertEquals(TOPIC, new String(msg.getPayload()));
+ msg.ack();
+
+ assertEquals(1, publishMap.size());
+ final short id = (short) (i + 1);
+ assertNotNull("No message for id " + id, publishMap.get(id));
+ publishMap.clear();
+
+ connection.disconnect();
+ }
+
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testClientConnectionFailure() throws Exception
+ {
+ MQTT mqtt = createMQTTConnection("reconnect", false);
+ mqtt.setKeepAlive((short) 1);
+
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ Wait.waitFor(new Wait.Condition()
+ {
+ @Override
+ public boolean isSatisified() throws Exception
+ {
+ return connection.isConnected();
+ }
+ });
+
+ final String TOPIC = "TopicA";
+ final byte[] qos = connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
+ assertEquals(QoS.EXACTLY_ONCE.ordinal(), qos[0]);
+ connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+ // kill transport
+ connection.kill();
+
+ // FIXME Wait for the previous connection to timeout. This is not required in ActiveMQ. Needs investigating.
+ Thread.sleep(10000);
+
+ final BlockingConnection newConnection = mqtt.blockingConnection();
+ newConnection.connect();
+ Wait.waitFor(new Wait.Condition()
+ {
+ @Override
+ public boolean isSatisified() throws Exception
+ {
+ return newConnection.isConnected();
+ }
+ });
+
+ assertEquals(QoS.EXACTLY_ONCE.ordinal(), qos[0]);
+ Message msg = newConnection.receive(1000, TimeUnit.MILLISECONDS);
+ assertNotNull(msg);
+ assertEquals(TOPIC, new String(msg.getPayload()));
+ msg.ack();
+ newConnection.disconnect();
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testCleanSession() throws Exception
+ {
+ final String CLIENTID = "cleansession";
+ final MQTT mqttNotClean = createMQTTConnection(CLIENTID, false);
+ BlockingConnection notClean = mqttNotClean.blockingConnection();
+ final String TOPIC = "TopicA";
+ notClean.connect();
+ notClean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
+ notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+ notClean.disconnect();
+
+ // MUST receive message from previous not clean session
+ notClean = mqttNotClean.blockingConnection();
+ notClean.connect();
+ Message msg = notClean.receive(10000, TimeUnit.MILLISECONDS);
+ assertNotNull(msg);
+ assertEquals(TOPIC, new String(msg.getPayload()));
+ msg.ack();
+ notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+ notClean.disconnect();
+
+ // MUST NOT receive message from previous not clean session
+ final MQTT mqttClean = createMQTTConnection(CLIENTID, true);
+ final BlockingConnection clean = mqttClean.blockingConnection();
+ clean.connect();
+ msg = clean.receive(10000, TimeUnit.MILLISECONDS);
+ assertNull(msg);
+ clean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
+ clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+ clean.disconnect();
+
+ // MUST NOT receive message from previous clean session
+ notClean = mqttNotClean.blockingConnection();
+ notClean.connect();
+ msg = notClean.receive(1000, TimeUnit.MILLISECONDS);
+ assertNull(msg);
+ notClean.disconnect();
+ }
+
+ /* TODO These Cross protocol tests were imported from ActiveMQ and need reworking to apply to Artemis. There is an
+ outstanding task to add cross protocol support. This task should rework these tests. The tests are included here
+ and commented out to ensure ActiveMQ and Artemis tests are in sync. */
+
+// @Test(timeout = 60 * 1000)
+// public void testSendMQTTReceiveJMS() throws Exception {
+// doTestSendMQTTReceiveJMS("foo.*");
+// }
+
+// public void doTestSendMQTTReceiveJMS(String destinationName) throws Exception {
+// final MQTTClientProvider provider = getMQTTClientProvider();
+// initializeConnection(provider);
+//
+// // send retained message
+// final String RETAINED = "RETAINED";
+// provider.publish("foo/bah", RETAINED.getBytes(), AT_LEAST_ONCE, true);
+//
+// ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
+// // MUST set to true to receive retained messages
+// activeMQConnection.setUseRetroactiveConsumer(true);
+// activeMQConnection.start();
+// Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// javax.jms.Topic jmsTopic = s.createTopic(destinationName);
+// MessageConsumer consumer = s.createConsumer(jmsTopic);
+//
+// // check whether we received retained message on JMS subscribe
+// ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
+// assertNotNull("Should get retained message", message);
+// ByteSequence bs = message.getContent();
+// assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
+// assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
+//
+// for (int i = 0; i < NUM_MESSAGES; i++) {
+// String payload = "Test Message: " + i;
+// provider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
+// message = (ActiveMQMessage) consumer.receive(5000);
+// assertNotNull("Should get a message", message);
+// bs = message.getContent();
+// assertEquals(payload, new String(bs.data, bs.offset, bs.length));
+// }
+//
+// activeMQConnection.close();
+// provider.disconnect();
+// }
+
+ // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
+// @Test(timeout = 2 * 60 * 1000)
+// public void testSendJMSReceiveMQTT() throws Exception {
+// doTestSendJMSReceiveMQTT("foo.far");
+// }
+
+ // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
+// public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception {
+// final MQTTClientProvider provider = getMQTTClientProvider();
+// initializeConnection(provider);
+//
+// ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
+// activeMQConnection.setUseRetroactiveConsumer(true);
+// activeMQConnection.start();
+// Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// javax.jms.Topic jmsTopic = s.createTopic(destinationName);
+// MessageProducer producer = s.createProducer(jmsTopic);
+//
+// // send retained message from JMS
+// final String RETAINED = "RETAINED";
+// TextMessage sendMessage = s.createTextMessage(RETAINED);
+// // mark the message to be retained
+// sendMessage.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true);
+// // MQTT QoS can be set using MQTTProtocolConverter.QOS_PROPERTY_NAME property
+// sendMessage.setIntProperty(MQTTProtocolConverter.QOS_PROPERTY_NAME, 0);
+// producer.send(sendMessage);
+//
+// provider.subscribe("foo/+", AT_MOST_ONCE);
+// byte[] message = provider.receive(10000);
+// assertNotNull("Should get retained message", message);
+// assertEquals(RETAINED, new String(message));
+//
+// for (int i = 0; i < NUM_MESSAGES; i++) {
+// String payload = "This is Test Message: " + i;
+// sendMessage = s.createTextMessage(payload);
+// producer.send(sendMessage);
+// message = provider.receive(5000);
+// assertNotNull("Should get a message", message);
+//
+// assertEquals(payload, new String(message));
+// }
+// provider.disconnect();
+// activeMQConnection.close();
+// }
+
+ @Test(timeout = 60 * 1000)
+ public void testPingKeepsInactivityMonitorAlive() throws Exception
+ {
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("foo");
+ mqtt.setKeepAlive((short) 2);
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition()
+ {
+
+ @Override
+ public boolean isSatisified() throws Exception
+ {
+ return connection.isConnected();
+ }
+ }));
+
+ connection.disconnect();
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testTurnOffInactivityMonitor() throws Exception
+ {
+ stopBroker();
+ protocolConfig = "transport.useInactivityMonitor=false";
+ startBroker();
+
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("foo3");
+ mqtt.setKeepAlive((short) 2);
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition()
+ {
+
+ @Override
+ public boolean isSatisified() throws Exception
+ {
+ return connection.isConnected();
+ }
+ }));
+
+ connection.disconnect();
+ }
+
+ @Ignore
+ @Test(timeout = 60 * 1000)
+ // TODO Make dollar topics configurable in code base.
+ public void testPublishDollarTopics() throws Exception
+ {
+ MQTT mqtt = createMQTTConnection();
+ final String clientId = "publishDollar";
+ mqtt.setClientId(clientId);
+ mqtt.setKeepAlive((short) 2);
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ final String DOLLAR_TOPIC = "$TopicA";
+ connection.subscribe(new Topic[]{new Topic(DOLLAR_TOPIC, QoS.EXACTLY_ONCE)});
+ connection.publish(DOLLAR_TOPIC, DOLLAR_TOPIC.getBytes(), QoS.EXACTLY_ONCE, true);
+
+ Message message = connection.receive(10, TimeUnit.SECONDS);
+ assertNull("Publish enabled for $ Topics by default", message);
+ connection.disconnect();
+
+ stopBroker();
+ protocolConfig = "transport.publishDollarTopics=true";
+ startBroker();
+
+ mqtt = createMQTTConnection();
+ mqtt.setClientId(clientId);
+ mqtt.setKeepAlive((short) 2);
+ connection = mqtt.blockingConnection();
+ connection.connect();
+
+ connection.subscribe(new Topic[]{new Topic(DOLLAR_TOPIC, QoS.EXACTLY_ONCE)});
+ connection.publish(DOLLAR_TOPIC, DOLLAR_TOPIC.getBytes(), QoS.EXACTLY_ONCE, true);
+
+ message = connection.receive(10, TimeUnit.SECONDS);
+ assertNotNull(message);
+ message.ack();
+ assertEquals("Message body", DOLLAR_TOPIC, new String(message.getPayload()));
+
+ connection.disconnect();
+ }
+
+ @Ignore
+ @Test(timeout = 60 * 1000)
+ // TODO We currently do not support link stealing. This needs to be enabled for this test to pass.
+ public void testDuplicateClientId() throws Exception
+ {
+ // test link stealing enabled by default
+ final String clientId = "duplicateClient";
+ MQTT mqtt = createMQTTConnection(clientId, false);
+ mqtt.setKeepAlive((short) 2);
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ final String TOPICA = "TopicA";
+ connection.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
+
+ MQTT mqtt1 = createMQTTConnection(clientId, false);
+ mqtt1.setKeepAlive((short) 2);
+ final BlockingConnection connection1 = mqtt1.blockingConnection();
+ connection1.connect();
+
+ assertTrue("Duplicate client disconnected", Wait.waitFor(new Wait.Condition()
+ {
+ @Override
+ public boolean isSatisified() throws Exception
+ {
+ return connection1.isConnected();
+ }
+ }));
+
+ assertTrue("Old client still connected", Wait.waitFor(new Wait.Condition()
+ {
+ @Override
+ public boolean isSatisified() throws Exception
+ {
+ return !connection.isConnected();
+ }
+ }));
+
+ connection1.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
+ connection1.disconnect();
+
+ // disable link stealing
+ stopBroker();
+ protocolConfig = "allowLinkStealing=false";
+ startBroker();
+
+ mqtt = createMQTTConnection(clientId, false);
+ mqtt.setKeepAlive((short) 2);
+ final BlockingConnection connection2 = mqtt.blockingConnection();
+ connection2.connect();
+ connection2.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
+
+ mqtt1 = createMQTTConnection(clientId, false);
+ mqtt1.setKeepAlive((short) 2);
+ final BlockingConnection connection3 = mqtt1.blockingConnection();
+ try
+ {
+ connection3.connect();
+ fail("Duplicate client connected");
+ }
+ catch (Exception e)
+ {
+ // ignore
+ }
+
+ assertTrue("Old client disconnected", connection2.isConnected());
+ connection2.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
+ connection2.disconnect();
+ }
+
+ // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
+// @Test(timeout = 30 * 10000)
+// public void testJmsMapping() throws Exception {
+// doTestJmsMapping("test.foo");
+// }
+
+ // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
+// public void doTestJmsMapping(String destinationName) throws Exception {
+// // start up jms consumer
+// Connection jmsConn = cf.createConnection();
+// Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// Destination dest = session.createTopic(destinationName);
+// MessageConsumer consumer = session.createConsumer(dest);
+// jmsConn.start();
+//
+// // set up mqtt producer
+// MQTT mqtt = createMQTTConnection();
+// mqtt.setClientId("foo3");
+// mqtt.setKeepAlive((short) 2);
+// final BlockingConnection connection = mqtt.blockingConnection();
+// connection.connect();
+//
+// int messagesToSend = 5;
+//
+// // publish
+// for (int i = 0; i < messagesToSend; ++i) {
+// connection.publish("test/foo", "hello world".getBytes(), QoS.AT_LEAST_ONCE, false);
+// }
+//
+// connection.disconnect();
+//
+// for (int i = 0; i < messagesToSend; i++) {
+//
+// javax.jms.Message message = consumer.receive(2 * 1000);
+// assertNotNull(message);
+// assertTrue(message instanceof BytesMessage);
+// BytesMessage bytesMessage = (BytesMessage) message;
+//
+// int length = (int) bytesMessage.getBodyLength();
+// byte[] buffer = new byte[length];
+// bytesMessage.readBytes(buffer);
+// assertEquals("hello world", new String(buffer));
+// }
+//
+// jmsConn.close();
+// }
+
+ @Test(timeout = 30 * 10000)
+ public void testSubscribeMultipleTopics() throws Exception
+ {
+
+ byte[] payload = new byte[1024 * 32];
+ for (int i = 0; i < payload.length; i++)
+ {
+ payload[i] = '2';
+ }
+
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("MQTT-Client");
+ mqtt.setCleanSession(false);
+
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ Topic[] topics = {new Topic("Topic/A", QoS.EXACTLY_ONCE), new Topic("Topic/B", QoS.EXACTLY_ONCE)};
+ Topic[] wildcardTopic = {new Topic("Topic/#", QoS.AT_LEAST_ONCE)};
+ connection.subscribe(wildcardTopic);
+
+ for (Topic topic : topics)
+ {
+ connection.publish(topic.name().toString(), payload, QoS.AT_LEAST_ONCE, false);
+ }
+
+ int received = 0;
+ for (int i = 0; i < topics.length; ++i)
+ {
+ Message message = connection.receive();
+ assertNotNull(message);
+ received++;
+ payload = message.getPayload();
+ String messageContent = new String(payload);
+ LOG.info("Received message from topic: " + message.getTopic() + " Message content: " + messageContent);
+ message.ack();
+ }
+
+ assertEquals("Should have received " + topics.length + " messages", topics.length, received);
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testReceiveMessageSentWhileOffline() throws Exception
+ {
+ final byte[] payload = new byte[1024 * 32];
+ for (int i = 0; i < payload.length; i++)
+ {
+ payload[i] = '2';
+ }
+
+ int numberOfRuns = 100;
+ int messagesPerRun = 2;
+
+ final MQTT mqttPub = createMQTTConnection("MQTT-Pub-Client", true);
+ final MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
+
+ final BlockingConnection connectionPub = mqttPub.blockingConnection();
+ connectionPub.connect();
+
+ BlockingConnection connectionSub = mqttSub.blockingConnection();
+ connectionSub.connect();
+
+ Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
+ connectionSub.subscribe(topics);
+
+ for (int i = 0; i < messagesPerRun; ++i)
+ {
+ connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false);
+ }
+
+ int received = 0;
+ for (int i = 0; i < messagesPerRun; ++i)
+ {
+ Message message = connectionSub.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ received++;
+ assertTrue(Arrays.equals(payload, message.getPayload()));
+ message.ack();
+ }
+ connectionSub.disconnect();
+
+ for (int j = 0; j < numberOfRuns; j++)
+ {
+
+ for (int i = 0; i < messagesPerRun; ++i)
+ {
+ connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false);
+ }
+
+ connectionSub = mqttSub.blockingConnection();
+ connectionSub.connect();
+ connectionSub.subscribe(topics);
+
+ for (int i = 0; i < messagesPerRun; ++i)
+ {
+ Message message = connectionSub.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ received++;
+ assertTrue(Arrays.equals(payload, message.getPayload()));
+ message.ack();
+ }
+ connectionSub.disconnect();
+ }
+ assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received);
+ }
+
+ @Test(timeout = 30 * 1000)
+ public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception
+ {
+ stopBroker();
+ protocolConfig = "transport.defaultKeepAlive=2000";
+ startBroker();
+
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("foo");
+ mqtt.setKeepAlive((short) 0);
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition()
+ {
+
+ @Override
+ public boolean isSatisified() throws Exception
+ {
+ return connection.isConnected();
+ }
+ }));
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testReuseConnection() throws Exception
+ {
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("Test-Client");
+
+ {
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ connection.disconnect();
+ Thread.sleep(1000);
+ }
+ {
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ connection.disconnect();
+ Thread.sleep(1000);
+ }
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testNoMessageReceivedAfterUnsubscribeMQTT() throws Exception
+ {
+ Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
+
+ MQTT mqttPub = createMQTTConnection("MQTTPub-Client", true);
+ // mqttPub.setVersion("3.1.1");
+
+ MQTT mqttSub = createMQTTConnection("MQTTSub-Client", false);
+ // mqttSub.setVersion("3.1.1");
+
+ BlockingConnection connectionPub = mqttPub.blockingConnection();
+ connectionPub.connect();
+
+ BlockingConnection connectionSub = mqttSub.blockingConnection();
+ connectionSub.connect();
+ connectionSub.subscribe(topics);
+ connectionSub.disconnect();
+
+ for (int i = 0; i < 5; i++)
+ {
+ String payload = "Message " + i;
+ connectionPub.publish(topics[0].name().toString(), payload.getBytes(), QoS.EXACTLY_ONCE, false);
+ }
+
+ connectionSub = mqttSub.blockingConnection();
+ connectionSub.connect();
+
+ int received = 0;
+ for (int i = 0; i < 5; ++i)
+ {
+ Message message = connectionSub.receive(5, TimeUnit.SECONDS);
+ assertNotNull("Missing message " + i, message);
+ LOG.info("Message is " + new String(message.getPayload()));
+ received++;
+ message.ack();
+ }
+ assertEquals(5, received);
+
+ // unsubscribe from topic
+ connectionSub.unsubscribe(new String[]{"TopicA"});
+
+ // send more messages
+ for (int i = 0; i < 5; i++)
+ {
+ String payload = "Message " + i;
+ connectionPub.publish(topics[0].name().toString(), payload.getBytes(), QoS.EXACTLY_ONCE, false);
+ }
+
+ // these should not be received
+ assertNull(connectionSub.receive(5, TimeUnit.SECONDS));
+
+ connectionSub.disconnect();
+ connectionPub.disconnect();
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testMQTT311Connection() throws Exception
+ {
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("foo");
+ mqtt.setVersion("3.1.1");
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ connection.disconnect();
+ }
+
+ // TODO This should be reworked to align with Artemis recovery.
+// @Test(timeout = 60 * 1000)
+// public void testActiveMQRecoveryPolicy() throws Exception {
+// // test with ActiveMQ LastImageSubscriptionRecoveryPolicy
+// final PolicyMap policyMap = new PolicyMap();
+// final PolicyEntry policyEntry = new PolicyEntry();
+// policyEntry.setSubscriptionRecoveryPolicy(new LastImageSubscriptionRecoveryPolicy());
+// policyMap.put(new ActiveMQTopic(">"), policyEntry);
+// brokerService.setDestinationPolicy(policyMap);
+//
+// MQTT mqtt = createMQTTConnection("pub-sub", true);
+// final int[] retain = new int[1];
+// final int[] nonretain = new int[1];
+// mqtt.setTracer(new Tracer() {
+// @Override
+// public void onReceive(MQTTFrame frame) {
+// if (frame.messageType() == PUBLISH.TYPE) {
+// LOG.info("Received message with retain=" + frame.retain());
+// if (frame.retain()) {
+// retain[0]++;
+// } else {
+// nonretain[0]++;
+// }
+// }
+// }
+// });
+//
+// BlockingConnection connection = mqtt.blockingConnection();
+// connection.connect();
+// final String RETAINED = "RETAINED";
+// connection.publish("one", RETAINED.getBytes(), QoS.AT_LEAST_ONCE, true);
+// connection.publish("two", RETAINED.getBytes(), QoS.AT_LEAST_ONCE, true);
+//
+// final String NONRETAINED = "NONRETAINED";
+// connection.publish("one", NONRETAINED.getBytes(), QoS.AT_LEAST_ONCE, false);
+// connection.publish("two", NONRETAINED.getBytes(), QoS.AT_LEAST_ONCE, false);
+//
+// connection.subscribe(new Topic[]{new Topic("#", QoS.AT_LEAST_ONCE)});
+// for (int i = 0; i < 4; i++) {
+// final Message message = connection.receive(30, TimeUnit.SECONDS);
+// assertNotNull("Should receive 4 messages", message);
+// message.ack();
+// }
+// assertEquals("Should receive 2 retained messages", 2, retain[0]);
+// assertEquals("Should receive 2 non-retained messages", 2, nonretain[0]);
+// }
+
+ // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
+// @Test(timeout = 60 * 1000)
+// public void testSendMQTTReceiveJMSVirtualTopic() throws Exception {
+//
+// final MQTTClientProvider provider = getMQTTClientProvider();
+// initializeConnection(provider);
+// final String DESTINATION_NAME = "Consumer.jms.VirtualTopic.TopicA";
+//
+// // send retained message
+// final String RETAINED = "RETAINED";
+// final String MQTT_DESTINATION_NAME = "VirtualTopic/TopicA";
+// provider.publish(MQTT_DESTINATION_NAME, RETAINED.getBytes(), AT_LEAST_ONCE, true);
+//
+// ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(jmsUri).createConnection();
+// // MUST set to true to receive retained messages
+// activeMQConnection.setUseRetroactiveConsumer(true);
+// activeMQConnection.start();
+// Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// Queue jmsQueue = s.createQueue(DESTINATION_NAME);
+// MessageConsumer consumer = s.createConsumer(jmsQueue);
+//
+// // check whether we received retained message on JMS subscribe
+// ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
+// assertNotNull("Should get retained message", message);
+// ByteSequence bs = message.getContent();
+// assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
+// assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
+//
+// for (int i = 0; i < NUM_MESSAGES; i++) {
+// String payload = "Test Message: " + i;
+// provider.publish(MQTT_DESTINATION_NAME, payload.getBytes(), AT_LEAST_ONCE);
+// message = (ActiveMQMessage) consumer.receive(5000);
+// assertNotNull("Should get a message", message);
+// bs = message.getContent();
+// assertEquals(payload, new String(bs.data, bs.offset, bs.length));
+// }
+//
+// // re-create consumer and check we received retained message again
+// consumer.close();
+// consumer = s.createConsumer(jmsQueue);
+// message = (ActiveMQMessage) consumer.receive(5000);
+// assertNotNull("Should get retained message", message);
+// bs = message.getContent();
+// assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
+// assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
+//
+// activeMQConnection.close();
+// provider.disconnect();
+// }
+
+ @Test(timeout = 60 * 1000)
+ public void testPingOnMQTT() throws Exception
+ {
+ stopBroker();
+ protocolConfig = "maxInactivityDuration=-1";
+ startBroker();
+
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("test-mqtt");
+ mqtt.setKeepAlive((short) 2);
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition()
+ {
+
+ @Override
+ public boolean isSatisified() throws Exception
+ {
+ return connection.isConnected();
+ }
+ }));
+
+ connection.disconnect();
+ }
+}
\ No newline at end of file
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
new file mode 100644
index 0000000000..fd802ec1c0
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.io.File;
+import java.io.IOException;
+import java.security.ProtectionDomain;
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Tracer;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MQTTTestSupport extends ActiveMQTestBase
+{
+ private ActiveMQServer server;
+
+ private static final Logger LOG = LoggerFactory.getLogger(MQTTTestSupport.class);
+
+ protected int port = 1883;
+ protected ActiveMQConnectionFactory cf;
+ protected LinkedList exceptions = new LinkedList();
+ protected boolean persistent;
+ protected String protocolConfig;
+ protected String protocolScheme;
+ protected boolean useSSL;
+
+ public static final int AT_MOST_ONCE = 0;
+ public static final int AT_LEAST_ONCE = 1;
+ public static final int EXACTLY_ONCE = 2;
+
+ @Rule
+ public TestName name = new TestName();
+
+ public MQTTTestSupport()
+ {
+ this.protocolScheme = "mqtt";
+ this.useSSL = false;
+ cf = new ActiveMQConnectionFactory(false, new TransportConfiguration(ActiveMQTestBase.NETTY_CONNECTOR_FACTORY));
+ }
+
+ public File basedir() throws IOException
+ {
+ ProtectionDomain protectionDomain = getClass().getProtectionDomain();
+ return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
+ }
+
+
+ public MQTTTestSupport(String connectorScheme, boolean useSSL)
+ {
+ this.protocolScheme = connectorScheme;
+ this.useSSL = useSSL;
+ }
+
+ public String getName()
+ {
+ return name.getMethodName();
+ }
+
+ @Before
+ public void setUp() throws Exception
+ {
+ String basedir = basedir().getPath();
+ System.setProperty("javax.net.ssl.trustStore", basedir + "/src/test/resources/client.keystore");
+ System.setProperty("javax.net.ssl.trustStorePassword", "password");
+ System.setProperty("javax.net.ssl.trustStoreType", "jks");
+ System.setProperty("javax.net.ssl.keyStore", basedir + "/src/test/resources/server.keystore");
+ System.setProperty("javax.net.ssl.keyStorePassword", "password");
+ System.setProperty("javax.net.ssl.keyStoreType", "jks");
+
+ exceptions.clear();
+ startBroker();
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ stopBroker();
+ }
+
+ public void startBroker() throws Exception
+ {
+ // TODO Add SSL
+ super.setUp();
+ server = createServer(true, true);
+ addCoreConnector();
+ addMQTTConnector();
+ AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setMaxSizeBytes(999999999);
+ server.getAddressSettingsRepository().addMatch("#", addressSettings);
+ server.start();
+ server.waitForActivation(10, TimeUnit.SECONDS);
+ }
+
+ protected void addCoreConnector() throws Exception
+ {
+ // Overrides of this method can add additional configuration options or add multiple
+ // MQTT transport connectors as needed, the port variable is always supposed to be
+ // assigned the primary MQTT connector's port.
+
+ HashMap params = new HashMap();
+ params.put(TransportConstants.PORT_PROP_NAME, "" + 5445);
+ params.put(TransportConstants.PROTOCOLS_PROP_NAME, "CORE");
+ TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
+ server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
+
+ LOG.info("Added connector {} to broker", getProtocolScheme());
+ }
+
+ protected void addMQTTConnector() throws Exception
+ {
+ // Overrides of this method can add additional configuration options or add multiple
+ // MQTT transport connectors as needed, the port variable is always supposed to be
+ // assigned the primary MQTT connector's port.
+
+ HashMap params = new HashMap();
+ params.put(TransportConstants.PORT_PROP_NAME, "" + port);
+ params.put(TransportConstants.PROTOCOLS_PROP_NAME, "MQTT");
+ TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
+ server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
+
+ LOG.info("Added connector {} to broker", getProtocolScheme());
+ }
+
+ public void stopBroker() throws Exception
+ {
+ if (server.isStarted())
+ {
+ server.stop();
+ server = null;
+ }
+ }
+
+ protected String getQueueName()
+ {
+ return getClass().getName() + "." + name.getMethodName();
+ }
+
+ protected String getTopicName()
+ {
+ return getClass().getName() + "." + name.getMethodName();
+ }
+
+ /**
+ * Initialize an MQTTClientProvider instance. By default this method uses the port that's
+ * assigned to be the TCP based port using the base version of addMQTTConnector. A subclass
+ * can either change the value of port or override this method to assign the correct port.
+ *
+ * @param provider the MQTTClientProvider instance to initialize.
+ * @throws Exception if an error occurs during initialization.
+ */
+ protected void initializeConnection(MQTTClientProvider provider) throws Exception
+ {
+ if (!isUseSSL())
+ {
+ provider.connect("tcp://localhost:" + port);
+ }
+ else
+ {
+ SSLContext ctx = SSLContext.getInstance("TLS");
+ ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
+ provider.setSslContext(ctx);
+ provider.connect("ssl://localhost:" + port);
+ }
+ }
+
+ public String getProtocolScheme()
+ {
+ return protocolScheme;
+ }
+
+ public void setProtocolScheme(String scheme)
+ {
+ this.protocolScheme = scheme;
+ }
+
+ public boolean isUseSSL()
+ {
+ return this.useSSL;
+ }
+
+ public void setUseSSL(boolean useSSL)
+ {
+ this.useSSL = useSSL;
+ }
+
+ public boolean isPersistent()
+ {
+ return persistent;
+ }
+
+ public int getPort()
+ {
+ return this.port;
+ }
+
+ public boolean isSchedulerSupportEnabled()
+ {
+ return false;
+ }
+
+ protected interface Task
+ {
+ void run() throws Exception;
+ }
+
+ protected void within(int time, TimeUnit unit, Task task) throws InterruptedException
+ {
+ long timeMS = unit.toMillis(time);
+ long deadline = System.currentTimeMillis() + timeMS;
+ while (true)
+ {
+ try
+ {
+ task.run();
+ return;
+ }
+ catch (Throwable e)
+ {
+ long remaining = deadline - System.currentTimeMillis();
+ if (remaining <= 0)
+ {
+ if (e instanceof RuntimeException)
+ {
+ throw (RuntimeException) e;
+ }
+ if (e instanceof Error)
+ {
+ throw (Error) e;
+ }
+ throw new RuntimeException(e);
+ }
+ Thread.sleep(Math.min(timeMS / 10, remaining));
+ }
+ }
+ }
+
+ protected MQTTClientProvider getMQTTClientProvider()
+ {
+ return new FuseMQTTClientProvider();
+ }
+
+ protected MQTT createMQTTConnection() throws Exception
+ {
+ MQTT client = createMQTTConnection(null, false);
+ client.setVersion("3.1.1");
+ return client;
+ }
+
+ protected MQTT createMQTTConnection(String clientId, boolean clean) throws Exception
+ {
+ if (isUseSSL())
+ {
+ return createMQTTSslConnection(clientId, clean);
+ }
+ else
+ {
+ return createMQTTTcpConnection(clientId, clean);
+ }
+ }
+
+ private MQTT createMQTTTcpConnection(String clientId, boolean clean) throws Exception
+ {
+ MQTT mqtt = new MQTT();
+ mqtt.setConnectAttemptsMax(1);
+ mqtt.setReconnectAttemptsMax(0);
+ mqtt.setTracer(createTracer());
+ mqtt.setVersion("3.1.1");
+ if (clientId != null)
+ {
+ mqtt.setClientId(clientId);
+ }
+ mqtt.setCleanSession(clean);
+ mqtt.setHost("localhost", port);
+ return mqtt;
+ }
+
+ private MQTT createMQTTSslConnection(String clientId, boolean clean) throws Exception
+ {
+ MQTT mqtt = new MQTT();
+ mqtt.setConnectAttemptsMax(1);
+ mqtt.setReconnectAttemptsMax(0);
+ mqtt.setTracer(createTracer());
+ mqtt.setHost("ssl://localhost:" + port);
+ if (clientId != null)
+ {
+ mqtt.setClientId(clientId);
+ }
+ mqtt.setCleanSession(clean);
+
+ SSLContext ctx = SSLContext.getInstance("TLS");
+ ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
+ mqtt.setSslContext(ctx);
+ return mqtt;
+ }
+
+ protected Tracer createTracer()
+ {
+ return new Tracer()
+ {
+ @Override
+ public void onReceive(MQTTFrame frame)
+ {
+ LOG.info("Client Received:\n" + frame);
+ }
+
+ @Override
+ public void onSend(MQTTFrame frame)
+ {
+ LOG.info("Client Sent:\n" + frame);
+ }
+
+ @Override
+ public void debug(String message, Object... args)
+ {
+ LOG.info(String.format(message, args));
+ }
+ };
+ }
+
+ static class DefaultTrustManager implements X509TrustManager
+ {
+
+ @Override
+ public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException
+ {
+ }
+
+ @Override
+ public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException
+ {
+ }
+
+ @Override
+ public X509Certificate[] getAcceptedIssuers()
+ {
+ return new X509Certificate[0];
+ }
+ }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/PahoMQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/PahoMQTTTest.java
new file mode 100644
index 0000000000..9a1313bac5
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/PahoMQTTTest.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTLogger;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.junit.Test;
+
+public class PahoMQTTTest extends MQTTTestSupport
+{
+
+ private static MQTTLogger LOG = MQTTLogger.LOGGER;
+
+ @Test(timeout = 300000)
+ public void testLotsOfClients() throws Exception
+ {
+
+ final int CLIENTS = Integer.getInteger("PahoMQTTTest.CLIENTS", 100);
+ LOG.info("Using: {} clients: " + CLIENTS);
+
+ final AtomicInteger receiveCounter = new AtomicInteger();
+ MqttClient client = createPahoClient("consumer");
+ client.setCallback(new MqttCallback()
+ {
+ @Override
+ public void connectionLost(Throwable cause)
+ {
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message) throws Exception
+ {
+ receiveCounter.incrementAndGet();
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token)
+ {
+ }
+ });
+ client.connect();
+ client.subscribe("test");
+
+ final AtomicReference asyncError = new AtomicReference();
+ final CountDownLatch connectedDoneLatch = new CountDownLatch(CLIENTS);
+ final CountDownLatch disconnectDoneLatch = new CountDownLatch(CLIENTS);
+ final CountDownLatch sendBarrier = new CountDownLatch(1);
+
+ for (int i = 0; i < CLIENTS; i++)
+ {
+ Thread.sleep(10);
+ new Thread(null, null, "client:" + i)
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ MqttClient client = createPahoClient(Thread.currentThread().getName());
+ client.connect();
+ connectedDoneLatch.countDown();
+ sendBarrier.await();
+ for (int i = 0; i < 10; i++)
+ {
+ Thread.sleep(1000);
+ client.publish("test", "hello".getBytes(), 1, false);
+ }
+ client.disconnect();
+ client.close();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ asyncError.set(e);
+ }
+ finally
+ {
+ disconnectDoneLatch.countDown();
+ }
+ }
+ }.start();
+ }
+
+ connectedDoneLatch.await();
+ assertNull("Async error: " + asyncError.get(), asyncError.get());
+ sendBarrier.countDown();
+
+ LOG.info("All clients connected... waiting to receive sent messages...");
+
+ // We should eventually get all the messages.
+ within(30, TimeUnit.SECONDS, new Task()
+ {
+ @Override
+ public void run() throws Exception
+ {
+ assertTrue(receiveCounter.get() == CLIENTS * 10);
+ }
+ });
+
+ LOG.info("All messages received.");
+
+ disconnectDoneLatch.await();
+ assertNull("Async error: " + asyncError.get(), asyncError.get());
+ }
+
+ @Test(timeout = 300000)
+ public void testSendAndReceiveMQTT() throws Exception
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ MqttClient consumer = createPahoClient("consumerId");
+ MqttClient producer = createPahoClient("producerId");
+
+ consumer.connect();
+ consumer.subscribe("test");
+ consumer.setCallback(new MqttCallback()
+ {
+ @Override
+ public void connectionLost(Throwable cause)
+ {
+
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message) throws Exception
+ {
+ latch.countDown();
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token)
+ {
+
+ }
+ });
+
+ producer.connect();
+ producer.publish("test", "hello".getBytes(), 1, false);
+
+ waitForLatch(latch);
+ producer.disconnect();
+ producer.close();
+ }
+
+ private MqttClient createPahoClient(String clientId) throws MqttException
+ {
+ return new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence());
+ }
+
+}
\ No newline at end of file
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/util/ResourceLoadingSslContext.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/util/ResourceLoadingSslContext.java
new file mode 100644
index 0000000000..9a4c03f866
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/util/ResourceLoadingSslContext.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.mqtt.imported.util;
+
+import javax.annotation.PostConstruct;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.File;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.security.KeyStore;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.activemq.broker.SslContext;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.core.io.Resource;
+import org.springframework.core.io.UrlResource;
+import org.springframework.util.ResourceUtils;
+
+/**
+ * Extends the SslContext so that it's easier to configure from spring.
+ */
+public class ResourceLoadingSslContext extends SslContext
+{
+
+ private String keyStoreType = "jks";
+ private String trustStoreType = "jks";
+
+ private String secureRandomAlgorithm = "SHA1PRNG";
+ private String keyStoreAlgorithm = KeyManagerFactory.getDefaultAlgorithm();
+ private String trustStoreAlgorithm = TrustManagerFactory.getDefaultAlgorithm();
+
+ private String keyStore;
+ private String trustStore;
+
+ private String keyStoreKeyPassword;
+ private String keyStorePassword;
+ private String trustStorePassword;
+
+ /**
+ * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
+ *
+ * delegates to afterPropertiesSet, done to prevent backwards incompatible
+ * signature change.
+ */
+ @PostConstruct
+ private void postConstruct()
+ {
+ try
+ {
+ afterPropertiesSet();
+ }
+ catch (Exception ex)
+ {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * @throws Exception
+ * @org.apache.xbean.InitMethod
+ */
+ public void afterPropertiesSet() throws Exception
+ {
+ keyManagers.addAll(createKeyManagers());
+ trustManagers.addAll(createTrustManagers());
+ if (secureRandom == null)
+ {
+ secureRandom = createSecureRandom();
+ }
+ }
+
+ private SecureRandom createSecureRandom() throws NoSuchAlgorithmException
+ {
+ return SecureRandom.getInstance(secureRandomAlgorithm);
+ }
+
+ private Collection createTrustManagers() throws Exception
+ {
+ KeyStore ks = createTrustManagerKeyStore();
+ if (ks == null)
+ {
+ return new ArrayList(0);
+ }
+
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance(trustStoreAlgorithm);
+ tmf.init(ks);
+ return Arrays.asList(tmf.getTrustManagers());
+ }
+
+ private Collection createKeyManagers() throws Exception
+ {
+ KeyStore ks = createKeyManagerKeyStore();
+ if (ks == null)
+ {
+ return new ArrayList(0);
+ }
+
+ KeyManagerFactory tmf = KeyManagerFactory.getInstance(keyStoreAlgorithm);
+ tmf.init(ks, keyStoreKeyPassword == null ? (keyStorePassword == null ? null : keyStorePassword.toCharArray()) : keyStoreKeyPassword.toCharArray());
+ return Arrays.asList(tmf.getKeyManagers());
+ }
+
+ private KeyStore createTrustManagerKeyStore() throws Exception
+ {
+ if (trustStore == null)
+ {
+ return null;
+ }
+
+ KeyStore ks = KeyStore.getInstance(trustStoreType);
+ InputStream is = resourceFromString(trustStore).getInputStream();
+ try
+ {
+ ks.load(is, trustStorePassword == null ? null : trustStorePassword.toCharArray());
+ }
+ finally
+ {
+ is.close();
+ }
+ return ks;
+ }
+
+ private KeyStore createKeyManagerKeyStore() throws Exception
+ {
+ if (keyStore == null)
+ {
+ return null;
+ }
+
+ KeyStore ks = KeyStore.getInstance(keyStoreType);
+ InputStream is = resourceFromString(keyStore).getInputStream();
+ try
+ {
+ ks.load(is, keyStorePassword == null ? null : keyStorePassword.toCharArray());
+ }
+ finally
+ {
+ is.close();
+ }
+ return ks;
+ }
+
+ public String getTrustStoreType()
+ {
+ return trustStoreType;
+ }
+
+ public String getKeyStoreType()
+ {
+ return keyStoreType;
+ }
+
+ public String getKeyStore()
+ {
+ return keyStore;
+ }
+
+ public void setKeyStore(String keyStore) throws MalformedURLException
+ {
+ this.keyStore = keyStore;
+ }
+
+ public String getTrustStore()
+ {
+ return trustStore;
+ }
+
+ public void setTrustStore(String trustStore) throws MalformedURLException
+ {
+ this.trustStore = trustStore;
+ }
+
+ public String getKeyStoreAlgorithm()
+ {
+ return keyStoreAlgorithm;
+ }
+
+ public void setKeyStoreAlgorithm(String keyAlgorithm)
+ {
+ this.keyStoreAlgorithm = keyAlgorithm;
+ }
+
+ public String getTrustStoreAlgorithm()
+ {
+ return trustStoreAlgorithm;
+ }
+
+ public void setTrustStoreAlgorithm(String trustAlgorithm)
+ {
+ this.trustStoreAlgorithm = trustAlgorithm;
+ }
+
+ public String getKeyStoreKeyPassword()
+ {
+ return keyStoreKeyPassword;
+ }
+
+ public void setKeyStoreKeyPassword(String keyPassword)
+ {
+ this.keyStoreKeyPassword = keyPassword;
+ }
+
+ public String getKeyStorePassword()
+ {
+ return keyStorePassword;
+ }
+
+ public void setKeyStorePassword(String keyPassword)
+ {
+ this.keyStorePassword = keyPassword;
+ }
+
+ public String getTrustStorePassword()
+ {
+ return trustStorePassword;
+ }
+
+ public void setTrustStorePassword(String trustPassword)
+ {
+ this.trustStorePassword = trustPassword;
+ }
+
+ public void setKeyStoreType(String keyType)
+ {
+ this.keyStoreType = keyType;
+ }
+
+ public void setTrustStoreType(String trustType)
+ {
+ this.trustStoreType = trustType;
+ }
+
+ public String getSecureRandomAlgorithm()
+ {
+ return secureRandomAlgorithm;
+ }
+
+ public void setSecureRandomAlgorithm(String secureRandomAlgorithm)
+ {
+ this.secureRandomAlgorithm = secureRandomAlgorithm;
+ }
+
+ public static Resource resourceFromString(String uri) throws MalformedURLException
+ {
+ Resource resource;
+ File file = new File(uri);
+ if (file.exists())
+ {
+ resource = new FileSystemResource(uri);
+ }
+ else if (ResourceUtils.isUrl(uri))
+ {
+ resource = new UrlResource(uri);
+ }
+ else
+ {
+ resource = new ClassPathResource(uri);
+ }
+ return resource;
+ }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/util/Wait.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/util/Wait.java
new file mode 100644
index 0000000000..84fc3a47fd
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/util/Wait.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.mqtt.imported.util;
+
+
+import java.util.concurrent.TimeUnit;
+
+public class Wait
+{
+
+ public static final long MAX_WAIT_MILLIS = 30 * 1000;
+ public static final int SLEEP_MILLIS = 1000;
+
+ public interface Condition
+ {
+ boolean isSatisified() throws Exception;
+ }
+
+ public static boolean waitFor(Condition condition) throws Exception
+ {
+ return waitFor(condition, MAX_WAIT_MILLIS);
+ }
+
+ public static boolean waitFor(final Condition condition, final long duration) throws Exception
+ {
+ return waitFor(condition, duration, SLEEP_MILLIS);
+ }
+
+ public static boolean waitFor(final Condition condition, final long duration, final int sleepMillis) throws Exception
+ {
+
+ final long expiry = System.currentTimeMillis() + duration;
+ boolean conditionSatisified = condition.isSatisified();
+ while (!conditionSatisified && System.currentTimeMillis() < expiry)
+ {
+ TimeUnit.MILLISECONDS.sleep(sleepMillis);
+ conditionSatisified = condition.isSatisified();
+ }
+ return conditionSatisified;
+ }
+}