diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java index 68648cdaa0..5d73f576ed 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java @@ -96,7 +96,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { connection.dataReceived(); - MQTTUtil.logMessage(log, message, true); + MQTTUtil.logMessage(session.getState(), message, true); switch (message.fixedHeader().messageType()) { case CONNECT: @@ -145,7 +145,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { disconnect(); } } catch (Exception e) { - log.warn("Error processing Control Packet, Disconnecting Client" + e.getMessage()); + log.debug("Error processing Control Packet, Disconnecting Client", e); disconnect(); } } @@ -243,6 +243,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttSubAckMessage ack = new MqttSubAckMessage(header, message.variableHeader(), new MqttSubAckPayload(qos)); + MQTTUtil.logMessage(session.getSessionState(), ack, false); ctx.write(ack); ctx.flush(); } @@ -255,6 +256,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { session.getSubscriptionManager().removeSubscriptions(message.payload().topics()); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader()); + MQTTUtil.logMessage(session.getSessionState(), m, false); ctx.write(m); ctx.flush(); } @@ -264,7 +266,9 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { } void handlePingreq(MqttMessage message, ChannelHandlerContext ctx) { - ctx.write(new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0))); + MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0)); + MQTTUtil.logMessage(session.getSessionState(), pingResp, false); + ctx.write(pingResp); ctx.flush(); } @@ -285,6 +289,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { MqttMessage publish = new MqttPublishMessage(header, varHeader, payload); this.protocolManager.invokeOutgoing(publish, connection); + MQTTUtil.logMessage(session.getSessionState(), publish, false); + ctx.write(publish); ctx.flush(); diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index 96c6bf679d..fb3363fda6 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -22,11 +22,12 @@ import java.io.UnsupportedEncodingException; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.EmptyByteBuf; -import io.netty.handler.codec.mqtt.MqttMessageType; +import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; @@ -48,11 +49,18 @@ public class MQTTPublishManager { private final Object lock = new Object(); + private MQTTSessionState state; + + private MQTTSessionState.OutboundStore outboundStore; + public MQTTPublishManager(MQTTSession session) { this.session = session; } synchronized void start() throws Exception { + this.state = session.getSessionState(); + this.outboundStore = state.getOutboundStore(); + createManagementAddress(); createManagementQueue(); createManagementConsumer(); @@ -75,12 +83,12 @@ public class MQTTPublishManager { } private void createManagementAddress() { - String clientId = session.getSessionState().getClientId(); - managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + clientId); + managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + state.getClientId()); } private void createManagementQueue() throws Exception { - if (session.getServer().locateQueue(managementAddress) == null) { + Queue q = session.getServer().locateQueue(managementAddress); + if (q == null) { session.getServerSession().createQueue(managementAddress, managementAddress, null, false, MQTTUtil.DURABLE_MESSAGES); } } @@ -89,10 +97,6 @@ public class MQTTPublishManager { return consumer == managementConsumer; } - private int generateMqttId(int qos) { - return session.getSessionState().generateId(); - } - /** * Since MQTT Subscriptions can over lap; a client may receive the same message twice. When this happens the client * returns a PubRec or PubAck with ID. But we need to know which consumer to ack, since we only have the ID to go on we @@ -110,10 +114,8 @@ public class MQTTPublishManager { sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos); session.getServerSession().acknowledge(consumer.getID(), message.getMessageID()); } else { - String consumerAddress = consumer.getQueue().getAddress().toString(); - Integer mqttid = generateMqttId(qos); - - session.getSessionState().addOutbandMessageRef(mqttid, consumerAddress, message.getMessageID(), qos); + int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID()); + outboundStore.publish(mqttid, message.getMessageID(), consumer.getID()); sendServerMessage(mqttid, (ServerMessageImpl) message, deliveryCount, qos); } } @@ -128,9 +130,9 @@ public class MQTTPublishManager { serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES); } - if (qos < 2 || !session.getSessionState().getPubRec().contains(messageId)) { + if (qos < 2 || !state.getPubRec().contains(messageId)) { if (qos == 2) - session.getSessionState().getPubRec().add(messageId); + state.getPubRec().add(messageId); session.getServerSession().send(serverMessage, true); } @@ -144,11 +146,29 @@ public class MQTTPublishManager { } void sendPubRelMessage(ServerMessage message) { - if (message.getIntProperty(MQTTUtil.MQTT_MESSAGE_TYPE_KEY) == MqttMessageType.PUBREL.value()) { - int messageId = message.getIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY); - MQTTMessageInfo messageInfo = new MQTTMessageInfo(message.getMessageID(), managementConsumer.getID(), message.getAddress().toString()); - session.getSessionState().storeMessageRef(messageId, messageInfo, false); - session.getProtocolHandler().sendPubRel(messageId); + int messageId = message.getIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY); + session.getProtocolHandler().sendPubRel(messageId); + } + + void handlePubRec(int messageId) throws Exception { + try { + Pair ref = outboundStore.publishReceived(messageId); + if (ref != null) { + ServerMessage m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId); + session.getServerSession().send(m, true); + session.getServerSession().acknowledge(ref.getB(), ref.getA()); + } else { + session.getProtocolHandler().sendPubRel(messageId); + } + } catch (ActiveMQIllegalStateException e) { + log.warn("MQTT Client(" + session.getSessionState().getClientId() + ") attempted to Ack already Ack'd message"); + } + } + + void handlePubComp(int messageId) throws Exception { + Pair ref = session.getState().getOutboundStore().publishComplete(messageId); + if (ref != null) { + session.getServerSession().acknowledge(ref.getB(), ref.getA()); } } @@ -170,38 +190,21 @@ public class MQTTPublishManager { }); } - void handlePubRec(int messageId) throws Exception { - MQTTMessageInfo messageRef = session.getSessionState().getMessageInfo(messageId); - if (messageRef != null) { - ServerMessage pubRel = MQTTUtil.createPubRelMessage(session, managementAddress, messageId); - session.getServerSession().send(pubRel, true); - session.getServerSession().acknowledge(messageRef.getConsumerId(), messageRef.getServerMessageId()); - session.getProtocolHandler().sendPubRel(messageId); - } - } - - void handlePubComp(int messageId) throws Exception { - MQTTMessageInfo messageInfo = session.getSessionState().getMessageInfo(messageId); - - // Check to see if this message is stored if not just drop the packet. - if (messageInfo != null) { - session.getServerSession().acknowledge(managementConsumer.getID(), messageInfo.getServerMessageId()); - } - } - void handlePubRel(int messageId) { // We don't check to see if a PubRel existed for this message. We assume it did and so send PubComp. - session.getSessionState().getPubRec().remove(messageId); + state.getPubRec().remove(messageId); session.getProtocolHandler().sendPubComp(messageId); - session.getSessionState().removeMessageRef(messageId); + state.removeMessageRef(messageId); } void handlePubAck(int messageId) throws Exception { - Pair pub1MessageInfo = session.getSessionState().removeOutbandMessageRef(messageId, 1); - if (pub1MessageInfo != null) { - String mqttAddress = MQTTUtil.convertCoreAddressFilterToMQTT(pub1MessageInfo.getA()); - ServerConsumer consumer = session.getSubscriptionManager().getConsumerForAddress(mqttAddress); - session.getServerSession().acknowledge(consumer.getID(), pub1MessageInfo.getB()); + try { + Pair ref = outboundStore.publishAckd(messageId); + if (ref != null) { + session.getServerSession().acknowledge(ref.getB(), ref.getA()); + } + } catch (ActiveMQIllegalStateException e) { + log.warn("MQTT Client(" + session.getSessionState().getClientId() + ") attempted to Ack already Ack'd message"); } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java index c48f6aada9..008bcd8526 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java @@ -66,9 +66,8 @@ public class MQTTRetainMessageManager { } } - void addRetainedMessagesToQueue(SimpleString queueName, String address) throws Exception { + void addRetainedMessagesToQueue(Queue queue, String address) throws Exception { // Queue to add the retained messages to - Queue queue = session.getServer().locateQueue(queueName); // The address filter that matches all retained message queues. String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(address); diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java index dd7a360fae..194fe5eb1f 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.mqtt; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -48,8 +49,6 @@ public class MQTTSessionState { private boolean attached = false; - private MQTTLogger log = MQTTLogger.LOGGER; - // Objects track the Outbound message references private Map> outboundMessageReferenceStore; @@ -60,6 +59,8 @@ public class MQTTSessionState { // FIXME We should use a better mechanism for creating packet IDs. private AtomicInteger lastId = new AtomicInteger(0); + private final OutboundStore outboundStore = new OutboundStore(); + public MQTTSessionState(String clientId) { this.clientId = clientId; @@ -73,53 +74,14 @@ public class MQTTSessionState { addressMessageMap = new ConcurrentHashMap<>(); } - int generateId() { - lastId.compareAndSet(Short.MAX_VALUE, 1); - return lastId.addAndGet(1); - } - - void addOutbandMessageRef(int mqttId, String address, long serverMessageId, int qos) { - synchronized (outboundLock) { - outboundMessageReferenceStore.put(mqttId, new Pair<>(address, serverMessageId)); - if (qos == 2) { - if (reverseOutboundReferenceStore.containsKey(address)) { - reverseOutboundReferenceStore.get(address).put(serverMessageId, mqttId); - } else { - ConcurrentHashMap serverToMqttId = new ConcurrentHashMap<>(); - serverToMqttId.put(serverMessageId, mqttId); - reverseOutboundReferenceStore.put(address, serverToMqttId); - } - } - } - } - - Pair removeOutbandMessageRef(int mqttId, int qos) { - synchronized (outboundLock) { - Pair messageInfo = outboundMessageReferenceStore.remove(mqttId); - if (qos == 1) { - return messageInfo; - } - - Map map = reverseOutboundReferenceStore.get(messageInfo.getA()); - if (map != null) { - map.remove(messageInfo.getB()); - if (map.isEmpty()) { - reverseOutboundReferenceStore.remove(messageInfo.getA()); - } - return messageInfo; - } - return null; - } + OutboundStore getOutboundStore() { + return outboundStore; } Set getPubRec() { return pubRec; } - Set getPub() { - return pub; - } - boolean getAttached() { return attached; } @@ -185,16 +147,6 @@ public class MQTTSessionState { this.clientId = clientId; } - void storeMessageRef(Integer mqttId, MQTTMessageInfo messageInfo, boolean storeAddress) { - messageRefStore.put(mqttId, messageInfo); - if (storeAddress) { - Map addressMap = addressMessageMap.get(messageInfo.getAddress()); - if (addressMap != null) { - addressMap.put(messageInfo.getServerMessageId(), mqttId); - } - } - } - void removeMessageRef(Integer mqttId) { MQTTMessageInfo info = messageRefStore.remove(mqttId); if (info != null) { @@ -205,7 +157,50 @@ public class MQTTSessionState { } } - MQTTMessageInfo getMessageInfo(Integer mqttId) { - return messageRefStore.get(mqttId); + public class OutboundStore { + + private final HashMap artemisToMqttMessageMap = new HashMap<>(); + + private final HashMap> mqttToServerIds = new HashMap<>(); + + private final Object dataStoreLock = new Object(); + + private final AtomicInteger ids = new AtomicInteger(0); + + public int generateMqttId(long serverId, long consumerId) { + synchronized (dataStoreLock) { + Integer id = artemisToMqttMessageMap.get(consumerId + ":" + serverId); + if (id == null) { + ids.compareAndSet(Short.MAX_VALUE, 1); + id = ids.addAndGet(1); + } + return id; + } + } + + public void publish(int mqtt, long serverId, long consumerId) { + synchronized (dataStoreLock) { + artemisToMqttMessageMap.put(consumerId + ":" + serverId, mqtt); + mqttToServerIds.put(mqtt, new Pair(serverId, consumerId)); + } + } + + public Pair publishAckd(int mqtt) { + synchronized (dataStoreLock) { + Pair p = mqttToServerIds.remove(mqtt); + if (p != null) { + mqttToServerIds.remove(p.getA()); + } + return p; + } + } + + public Pair publishReceived(int mqtt) { + return publishAckd(mqtt); + } + + public Pair publishComplete(int mqtt) { + return publishAckd(mqtt); + } } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index ea3ab19e16..d894910fc3 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -36,8 +36,6 @@ public class MQTTSubscriptionManager { private ConcurrentMap consumers; - private MQTTLogger log = MQTTLogger.LOGGER; - // We filter out Artemis management messages and notifications private SimpleString managementFilter; @@ -63,7 +61,7 @@ public class MQTTSubscriptionManager { synchronized void start() throws Exception { for (MqttTopicSubscription subscription : session.getSessionState().getSubscriptions()) { - SimpleString q = createQueueForSubscription(subscription.topicName(), subscription.qualityOfService().value()); + Queue q = createQueueForSubscription(subscription.topicName(), subscription.qualityOfService().value()); createConsumerForSubscriptionQueue(q, subscription.topicName(), subscription.qualityOfService().value()); } } @@ -86,23 +84,23 @@ public class MQTTSubscriptionManager { /** * Creates a Queue if it doesn't already exist, based on a topic and address. Returning the queue name. */ - private SimpleString createQueueForSubscription(String topic, int qos) throws Exception { + private Queue createQueueForSubscription(String topic, int qos) throws Exception { String address = MQTTUtil.convertMQTTAddressFilterToCore(topic); SimpleString queue = getQueueNameForTopic(address); Queue q = session.getServer().locateQueue(queue); if (q == null) { - session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0); + q = session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0); } - return queue; + return q; } /** * Creates a new consumer for the queue associated with a subscription */ - private void createConsumerForSubscriptionQueue(SimpleString queue, String topic, int qos) throws Exception { + private void createConsumerForSubscriptionQueue(Queue queue, String topic, int qos) throws Exception { long cid = session.getServer().getStorageManager().generateID(); - ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue, null, false, true, -1); + ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue.getName(), null, false, true, -1); consumer.setStarted(true); consumers.put(topic, consumer); @@ -117,7 +115,7 @@ public class MQTTSubscriptionManager { session.getSessionState().addSubscription(subscription); - SimpleString q = createQueueForSubscription(topic, qos); + Queue q = createQueueForSubscription(topic, qos); if (s == null) { createConsumerForSubscriptionQueue(q, topic, qos); @@ -171,7 +169,4 @@ public class MQTTSubscriptionManager { return consumerQoSLevels; } - ServerConsumer getConsumerForAddress(String address) { - return consumers.get(address); - } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java index e6affc1c5d..3638431b2e 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java @@ -69,6 +69,8 @@ public class MQTTUtil { return swapMQTTAndCoreWildCards(filter); } + private static final MQTTLogger logger = MQTTLogger.LOGGER; + public static String convertCoreAddressFilterToMQTT(String filter) { if (filter.startsWith(MQTT_RETAIN_ADDRESS_PREFIX)) { filter = filter.substring(MQTT_RETAIN_ADDRESS_PREFIX.length(), filter.length()); @@ -148,25 +150,38 @@ public class MQTTUtil { return message; } - public static void logMessage(MQTTLogger logger, MqttMessage message, boolean inbound) { - StringBuilder log = inbound ? new StringBuilder("Received ") : new StringBuilder("Sent "); + public static void logMessage(MQTTSessionState state, MqttMessage message, boolean inbound) { + if (logger.isTraceEnabled()) { - if (message.fixedHeader() != null) { - log.append(message.fixedHeader().messageType().toString()); + StringBuilder log = new StringBuilder("MQTT("); - if (message.variableHeader() instanceof MqttPublishVariableHeader) { - log.append("(" + ((MqttPublishVariableHeader) message.variableHeader()).messageId() + ") " + message.fixedHeader().qosLevel()); - } else if (message.variableHeader() instanceof MqttMessageIdVariableHeader) { - log.append("(" + ((MqttMessageIdVariableHeader) message.variableHeader()).messageId() + ")"); + if (state != null) { + log.append(state.getClientId()); } - if (message.fixedHeader().messageType() == MqttMessageType.SUBSCRIBE) { - for (MqttTopicSubscription sub : ((MqttSubscribeMessage) message).payload().topicSubscriptions()) { - log.append("\n\t" + sub.topicName() + " : " + sub.qualityOfService()); + if (inbound) { + log.append("): IN << "); + } else { + log.append("): OUT >> "); + } + + if (message.fixedHeader() != null) { + log.append(message.fixedHeader().messageType().toString()); + + if (message.variableHeader() instanceof MqttPublishVariableHeader) { + log.append("(" + ((MqttPublishVariableHeader) message.variableHeader()).messageId() + ") " + message.fixedHeader().qosLevel()); + } else if (message.variableHeader() instanceof MqttMessageIdVariableHeader) { + log.append("(" + ((MqttMessageIdVariableHeader) message.variableHeader()).messageId() + ")"); } - } - logger.debug(log.toString()); + if (message.fixedHeader().messageType() == MqttMessageType.SUBSCRIBE) { + for (MqttTopicSubscription sub : ((MqttSubscribeMessage) message).payload().topicSubscriptions()) { + log.append("\n\t" + sub.topicName() + " : " + sub.qualityOfService()); + } + } + + logger.trace(log.toString()); + } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 7ea7a1e5d6..b809df0c49 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -267,6 +267,32 @@ public class MQTTTest extends MQTTTestSupport { assertEquals(NUM_MESSAGES, MQTTOutoingInterceptor.getMessageCount()); } + @Test(timeout = 600 * 1000) + public void testSendMoreThanUniqueId() throws Exception { + int messages = (Short.MAX_VALUE * 2) + 1; + + final MQTTClientProvider publisher = getMQTTClientProvider(); + initializeConnection(publisher); + + final MQTTClientProvider subscriber = getMQTTClientProvider(); + initializeConnection(subscriber); + + int count = 0; + subscriber.subscribe("foo", EXACTLY_ONCE); + for (int i = 0; i < messages; i++) { + String payload = "Test Message: " + i; + publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE); + byte[] message = subscriber.receive(5000); + assertNotNull("Should get a message + [" + i + "]", message); + assertEquals(payload, new String(message)); + count++; + } + + assertEquals(messages, count); + subscriber.disconnect(); + publisher.disconnect(); + } + @Test(timeout = 60 * 1000) public void testSendAndReceiveLargeMessages() throws Exception { byte[] payload = new byte[1024 * 32];