ARTEMIS-778 Fix MQTT tests, refactor session state
This commit is contained in:
parent
646c8ce7a5
commit
a09348695c
|
@ -96,7 +96,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
||||||
|
|
||||||
connection.dataReceived();
|
connection.dataReceived();
|
||||||
|
|
||||||
MQTTUtil.logMessage(log, message, true);
|
MQTTUtil.logMessage(session.getState(), message, true);
|
||||||
|
|
||||||
switch (message.fixedHeader().messageType()) {
|
switch (message.fixedHeader().messageType()) {
|
||||||
case CONNECT:
|
case CONNECT:
|
||||||
|
@ -145,7 +145,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
||||||
disconnect();
|
disconnect();
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("Error processing Control Packet, Disconnecting Client" + e.getMessage());
|
log.debug("Error processing Control Packet, Disconnecting Client", e);
|
||||||
disconnect();
|
disconnect();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -243,6 +243,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
||||||
|
|
||||||
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
||||||
MqttSubAckMessage ack = new MqttSubAckMessage(header, message.variableHeader(), new MqttSubAckPayload(qos));
|
MqttSubAckMessage ack = new MqttSubAckMessage(header, message.variableHeader(), new MqttSubAckPayload(qos));
|
||||||
|
MQTTUtil.logMessage(session.getSessionState(), ack, false);
|
||||||
ctx.write(ack);
|
ctx.write(ack);
|
||||||
ctx.flush();
|
ctx.flush();
|
||||||
}
|
}
|
||||||
|
@ -255,6 +256,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
||||||
session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
|
session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
|
||||||
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
||||||
MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader());
|
MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader());
|
||||||
|
MQTTUtil.logMessage(session.getSessionState(), m, false);
|
||||||
ctx.write(m);
|
ctx.write(m);
|
||||||
ctx.flush();
|
ctx.flush();
|
||||||
}
|
}
|
||||||
|
@ -264,7 +266,9 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
||||||
}
|
}
|
||||||
|
|
||||||
void handlePingreq(MqttMessage message, ChannelHandlerContext ctx) {
|
void handlePingreq(MqttMessage message, ChannelHandlerContext ctx) {
|
||||||
ctx.write(new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0)));
|
MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0));
|
||||||
|
MQTTUtil.logMessage(session.getSessionState(), pingResp, false);
|
||||||
|
ctx.write(pingResp);
|
||||||
ctx.flush();
|
ctx.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -285,6 +289,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
||||||
MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
|
MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
|
||||||
this.protocolManager.invokeOutgoing(publish, connection);
|
this.protocolManager.invokeOutgoing(publish, connection);
|
||||||
|
|
||||||
|
MQTTUtil.logMessage(session.getSessionState(), publish, false);
|
||||||
|
|
||||||
ctx.write(publish);
|
ctx.write(publish);
|
||||||
ctx.flush();
|
ctx.flush();
|
||||||
|
|
||||||
|
|
|
@ -22,11 +22,12 @@ import java.io.UnsupportedEncodingException;
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.ByteBufAllocator;
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
import io.netty.buffer.EmptyByteBuf;
|
import io.netty.buffer.EmptyByteBuf;
|
||||||
import io.netty.handler.codec.mqtt.MqttMessageType;
|
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.Pair;
|
import org.apache.activemq.artemis.api.core.Pair;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||||
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||||
import org.apache.activemq.artemis.core.server.ServerMessage;
|
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
|
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
|
||||||
|
@ -48,11 +49,18 @@ public class MQTTPublishManager {
|
||||||
|
|
||||||
private final Object lock = new Object();
|
private final Object lock = new Object();
|
||||||
|
|
||||||
|
private MQTTSessionState state;
|
||||||
|
|
||||||
|
private MQTTSessionState.OutboundStore outboundStore;
|
||||||
|
|
||||||
public MQTTPublishManager(MQTTSession session) {
|
public MQTTPublishManager(MQTTSession session) {
|
||||||
this.session = session;
|
this.session = session;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void start() throws Exception {
|
synchronized void start() throws Exception {
|
||||||
|
this.state = session.getSessionState();
|
||||||
|
this.outboundStore = state.getOutboundStore();
|
||||||
|
|
||||||
createManagementAddress();
|
createManagementAddress();
|
||||||
createManagementQueue();
|
createManagementQueue();
|
||||||
createManagementConsumer();
|
createManagementConsumer();
|
||||||
|
@ -75,12 +83,12 @@ public class MQTTPublishManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createManagementAddress() {
|
private void createManagementAddress() {
|
||||||
String clientId = session.getSessionState().getClientId();
|
managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + state.getClientId());
|
||||||
managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + clientId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createManagementQueue() throws Exception {
|
private void createManagementQueue() throws Exception {
|
||||||
if (session.getServer().locateQueue(managementAddress) == null) {
|
Queue q = session.getServer().locateQueue(managementAddress);
|
||||||
|
if (q == null) {
|
||||||
session.getServerSession().createQueue(managementAddress, managementAddress, null, false, MQTTUtil.DURABLE_MESSAGES);
|
session.getServerSession().createQueue(managementAddress, managementAddress, null, false, MQTTUtil.DURABLE_MESSAGES);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -89,10 +97,6 @@ public class MQTTPublishManager {
|
||||||
return consumer == managementConsumer;
|
return consumer == managementConsumer;
|
||||||
}
|
}
|
||||||
|
|
||||||
private int generateMqttId(int qos) {
|
|
||||||
return session.getSessionState().generateId();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Since MQTT Subscriptions can over lap; a client may receive the same message twice. When this happens the client
|
* Since MQTT Subscriptions can over lap; a client may receive the same message twice. When this happens the client
|
||||||
* returns a PubRec or PubAck with ID. But we need to know which consumer to ack, since we only have the ID to go on we
|
* returns a PubRec or PubAck with ID. But we need to know which consumer to ack, since we only have the ID to go on we
|
||||||
|
@ -110,10 +114,8 @@ public class MQTTPublishManager {
|
||||||
sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos);
|
sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos);
|
||||||
session.getServerSession().acknowledge(consumer.getID(), message.getMessageID());
|
session.getServerSession().acknowledge(consumer.getID(), message.getMessageID());
|
||||||
} else {
|
} else {
|
||||||
String consumerAddress = consumer.getQueue().getAddress().toString();
|
int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
|
||||||
Integer mqttid = generateMqttId(qos);
|
outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
|
||||||
|
|
||||||
session.getSessionState().addOutbandMessageRef(mqttid, consumerAddress, message.getMessageID(), qos);
|
|
||||||
sendServerMessage(mqttid, (ServerMessageImpl) message, deliveryCount, qos);
|
sendServerMessage(mqttid, (ServerMessageImpl) message, deliveryCount, qos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -128,9 +130,9 @@ public class MQTTPublishManager {
|
||||||
serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES);
|
serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (qos < 2 || !session.getSessionState().getPubRec().contains(messageId)) {
|
if (qos < 2 || !state.getPubRec().contains(messageId)) {
|
||||||
if (qos == 2)
|
if (qos == 2)
|
||||||
session.getSessionState().getPubRec().add(messageId);
|
state.getPubRec().add(messageId);
|
||||||
session.getServerSession().send(serverMessage, true);
|
session.getServerSession().send(serverMessage, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -144,12 +146,30 @@ public class MQTTPublishManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
void sendPubRelMessage(ServerMessage message) {
|
void sendPubRelMessage(ServerMessage message) {
|
||||||
if (message.getIntProperty(MQTTUtil.MQTT_MESSAGE_TYPE_KEY) == MqttMessageType.PUBREL.value()) {
|
|
||||||
int messageId = message.getIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY);
|
int messageId = message.getIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY);
|
||||||
MQTTMessageInfo messageInfo = new MQTTMessageInfo(message.getMessageID(), managementConsumer.getID(), message.getAddress().toString());
|
|
||||||
session.getSessionState().storeMessageRef(messageId, messageInfo, false);
|
|
||||||
session.getProtocolHandler().sendPubRel(messageId);
|
session.getProtocolHandler().sendPubRel(messageId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void handlePubRec(int messageId) throws Exception {
|
||||||
|
try {
|
||||||
|
Pair<Long, Long> ref = outboundStore.publishReceived(messageId);
|
||||||
|
if (ref != null) {
|
||||||
|
ServerMessage m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId);
|
||||||
|
session.getServerSession().send(m, true);
|
||||||
|
session.getServerSession().acknowledge(ref.getB(), ref.getA());
|
||||||
|
} else {
|
||||||
|
session.getProtocolHandler().sendPubRel(messageId);
|
||||||
|
}
|
||||||
|
} catch (ActiveMQIllegalStateException e) {
|
||||||
|
log.warn("MQTT Client(" + session.getSessionState().getClientId() + ") attempted to Ack already Ack'd message");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void handlePubComp(int messageId) throws Exception {
|
||||||
|
Pair<Long, Long> ref = session.getState().getOutboundStore().publishComplete(messageId);
|
||||||
|
if (ref != null) {
|
||||||
|
session.getServerSession().acknowledge(ref.getB(), ref.getA());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createMessageAck(final int messageId, final int qos) {
|
private void createMessageAck(final int messageId, final int qos) {
|
||||||
|
@ -170,38 +190,21 @@ public class MQTTPublishManager {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void handlePubRec(int messageId) throws Exception {
|
|
||||||
MQTTMessageInfo messageRef = session.getSessionState().getMessageInfo(messageId);
|
|
||||||
if (messageRef != null) {
|
|
||||||
ServerMessage pubRel = MQTTUtil.createPubRelMessage(session, managementAddress, messageId);
|
|
||||||
session.getServerSession().send(pubRel, true);
|
|
||||||
session.getServerSession().acknowledge(messageRef.getConsumerId(), messageRef.getServerMessageId());
|
|
||||||
session.getProtocolHandler().sendPubRel(messageId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void handlePubComp(int messageId) throws Exception {
|
|
||||||
MQTTMessageInfo messageInfo = session.getSessionState().getMessageInfo(messageId);
|
|
||||||
|
|
||||||
// Check to see if this message is stored if not just drop the packet.
|
|
||||||
if (messageInfo != null) {
|
|
||||||
session.getServerSession().acknowledge(managementConsumer.getID(), messageInfo.getServerMessageId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void handlePubRel(int messageId) {
|
void handlePubRel(int messageId) {
|
||||||
// We don't check to see if a PubRel existed for this message. We assume it did and so send PubComp.
|
// We don't check to see if a PubRel existed for this message. We assume it did and so send PubComp.
|
||||||
session.getSessionState().getPubRec().remove(messageId);
|
state.getPubRec().remove(messageId);
|
||||||
session.getProtocolHandler().sendPubComp(messageId);
|
session.getProtocolHandler().sendPubComp(messageId);
|
||||||
session.getSessionState().removeMessageRef(messageId);
|
state.removeMessageRef(messageId);
|
||||||
}
|
}
|
||||||
|
|
||||||
void handlePubAck(int messageId) throws Exception {
|
void handlePubAck(int messageId) throws Exception {
|
||||||
Pair<String, Long> pub1MessageInfo = session.getSessionState().removeOutbandMessageRef(messageId, 1);
|
try {
|
||||||
if (pub1MessageInfo != null) {
|
Pair<Long, Long> ref = outboundStore.publishAckd(messageId);
|
||||||
String mqttAddress = MQTTUtil.convertCoreAddressFilterToMQTT(pub1MessageInfo.getA());
|
if (ref != null) {
|
||||||
ServerConsumer consumer = session.getSubscriptionManager().getConsumerForAddress(mqttAddress);
|
session.getServerSession().acknowledge(ref.getB(), ref.getA());
|
||||||
session.getServerSession().acknowledge(consumer.getID(), pub1MessageInfo.getB());
|
}
|
||||||
|
} catch (ActiveMQIllegalStateException e) {
|
||||||
|
log.warn("MQTT Client(" + session.getSessionState().getClientId() + ") attempted to Ack already Ack'd message");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -66,9 +66,8 @@ public class MQTTRetainMessageManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void addRetainedMessagesToQueue(SimpleString queueName, String address) throws Exception {
|
void addRetainedMessagesToQueue(Queue queue, String address) throws Exception {
|
||||||
// Queue to add the retained messages to
|
// Queue to add the retained messages to
|
||||||
Queue queue = session.getServer().locateQueue(queueName);
|
|
||||||
|
|
||||||
// The address filter that matches all retained message queues.
|
// The address filter that matches all retained message queues.
|
||||||
String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(address);
|
String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(address);
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.activemq.artemis.core.protocol.mqtt;
|
package org.apache.activemq.artemis.core.protocol.mqtt;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -48,8 +49,6 @@ public class MQTTSessionState {
|
||||||
|
|
||||||
private boolean attached = false;
|
private boolean attached = false;
|
||||||
|
|
||||||
private MQTTLogger log = MQTTLogger.LOGGER;
|
|
||||||
|
|
||||||
// Objects track the Outbound message references
|
// Objects track the Outbound message references
|
||||||
private Map<Integer, Pair<String, Long>> outboundMessageReferenceStore;
|
private Map<Integer, Pair<String, Long>> outboundMessageReferenceStore;
|
||||||
|
|
||||||
|
@ -60,6 +59,8 @@ public class MQTTSessionState {
|
||||||
// FIXME We should use a better mechanism for creating packet IDs.
|
// FIXME We should use a better mechanism for creating packet IDs.
|
||||||
private AtomicInteger lastId = new AtomicInteger(0);
|
private AtomicInteger lastId = new AtomicInteger(0);
|
||||||
|
|
||||||
|
private final OutboundStore outboundStore = new OutboundStore();
|
||||||
|
|
||||||
public MQTTSessionState(String clientId) {
|
public MQTTSessionState(String clientId) {
|
||||||
this.clientId = clientId;
|
this.clientId = clientId;
|
||||||
|
|
||||||
|
@ -73,53 +74,14 @@ public class MQTTSessionState {
|
||||||
addressMessageMap = new ConcurrentHashMap<>();
|
addressMessageMap = new ConcurrentHashMap<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
int generateId() {
|
OutboundStore getOutboundStore() {
|
||||||
lastId.compareAndSet(Short.MAX_VALUE, 1);
|
return outboundStore;
|
||||||
return lastId.addAndGet(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
void addOutbandMessageRef(int mqttId, String address, long serverMessageId, int qos) {
|
|
||||||
synchronized (outboundLock) {
|
|
||||||
outboundMessageReferenceStore.put(mqttId, new Pair<>(address, serverMessageId));
|
|
||||||
if (qos == 2) {
|
|
||||||
if (reverseOutboundReferenceStore.containsKey(address)) {
|
|
||||||
reverseOutboundReferenceStore.get(address).put(serverMessageId, mqttId);
|
|
||||||
} else {
|
|
||||||
ConcurrentHashMap<Long, Integer> serverToMqttId = new ConcurrentHashMap<>();
|
|
||||||
serverToMqttId.put(serverMessageId, mqttId);
|
|
||||||
reverseOutboundReferenceStore.put(address, serverToMqttId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Pair<String, Long> removeOutbandMessageRef(int mqttId, int qos) {
|
|
||||||
synchronized (outboundLock) {
|
|
||||||
Pair<String, Long> messageInfo = outboundMessageReferenceStore.remove(mqttId);
|
|
||||||
if (qos == 1) {
|
|
||||||
return messageInfo;
|
|
||||||
}
|
|
||||||
|
|
||||||
Map<Long, Integer> map = reverseOutboundReferenceStore.get(messageInfo.getA());
|
|
||||||
if (map != null) {
|
|
||||||
map.remove(messageInfo.getB());
|
|
||||||
if (map.isEmpty()) {
|
|
||||||
reverseOutboundReferenceStore.remove(messageInfo.getA());
|
|
||||||
}
|
|
||||||
return messageInfo;
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Set<Integer> getPubRec() {
|
Set<Integer> getPubRec() {
|
||||||
return pubRec;
|
return pubRec;
|
||||||
}
|
}
|
||||||
|
|
||||||
Set<Integer> getPub() {
|
|
||||||
return pub;
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean getAttached() {
|
boolean getAttached() {
|
||||||
return attached;
|
return attached;
|
||||||
}
|
}
|
||||||
|
@ -185,16 +147,6 @@ public class MQTTSessionState {
|
||||||
this.clientId = clientId;
|
this.clientId = clientId;
|
||||||
}
|
}
|
||||||
|
|
||||||
void storeMessageRef(Integer mqttId, MQTTMessageInfo messageInfo, boolean storeAddress) {
|
|
||||||
messageRefStore.put(mqttId, messageInfo);
|
|
||||||
if (storeAddress) {
|
|
||||||
Map<Long, Integer> addressMap = addressMessageMap.get(messageInfo.getAddress());
|
|
||||||
if (addressMap != null) {
|
|
||||||
addressMap.put(messageInfo.getServerMessageId(), mqttId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void removeMessageRef(Integer mqttId) {
|
void removeMessageRef(Integer mqttId) {
|
||||||
MQTTMessageInfo info = messageRefStore.remove(mqttId);
|
MQTTMessageInfo info = messageRefStore.remove(mqttId);
|
||||||
if (info != null) {
|
if (info != null) {
|
||||||
|
@ -205,7 +157,50 @@ public class MQTTSessionState {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
MQTTMessageInfo getMessageInfo(Integer mqttId) {
|
public class OutboundStore {
|
||||||
return messageRefStore.get(mqttId);
|
|
||||||
|
private final HashMap<String, Integer> artemisToMqttMessageMap = new HashMap<>();
|
||||||
|
|
||||||
|
private final HashMap<Integer, Pair<Long, Long>> mqttToServerIds = new HashMap<>();
|
||||||
|
|
||||||
|
private final Object dataStoreLock = new Object();
|
||||||
|
|
||||||
|
private final AtomicInteger ids = new AtomicInteger(0);
|
||||||
|
|
||||||
|
public int generateMqttId(long serverId, long consumerId) {
|
||||||
|
synchronized (dataStoreLock) {
|
||||||
|
Integer id = artemisToMqttMessageMap.get(consumerId + ":" + serverId);
|
||||||
|
if (id == null) {
|
||||||
|
ids.compareAndSet(Short.MAX_VALUE, 1);
|
||||||
|
id = ids.addAndGet(1);
|
||||||
|
}
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void publish(int mqtt, long serverId, long consumerId) {
|
||||||
|
synchronized (dataStoreLock) {
|
||||||
|
artemisToMqttMessageMap.put(consumerId + ":" + serverId, mqtt);
|
||||||
|
mqttToServerIds.put(mqtt, new Pair(serverId, consumerId));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Pair<Long, Long> publishAckd(int mqtt) {
|
||||||
|
synchronized (dataStoreLock) {
|
||||||
|
Pair p = mqttToServerIds.remove(mqtt);
|
||||||
|
if (p != null) {
|
||||||
|
mqttToServerIds.remove(p.getA());
|
||||||
|
}
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Pair<Long, Long> publishReceived(int mqtt) {
|
||||||
|
return publishAckd(mqtt);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Pair<Long, Long> publishComplete(int mqtt) {
|
||||||
|
return publishAckd(mqtt);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,8 +36,6 @@ public class MQTTSubscriptionManager {
|
||||||
|
|
||||||
private ConcurrentMap<String, ServerConsumer> consumers;
|
private ConcurrentMap<String, ServerConsumer> consumers;
|
||||||
|
|
||||||
private MQTTLogger log = MQTTLogger.LOGGER;
|
|
||||||
|
|
||||||
// We filter out Artemis management messages and notifications
|
// We filter out Artemis management messages and notifications
|
||||||
private SimpleString managementFilter;
|
private SimpleString managementFilter;
|
||||||
|
|
||||||
|
@ -63,7 +61,7 @@ public class MQTTSubscriptionManager {
|
||||||
|
|
||||||
synchronized void start() throws Exception {
|
synchronized void start() throws Exception {
|
||||||
for (MqttTopicSubscription subscription : session.getSessionState().getSubscriptions()) {
|
for (MqttTopicSubscription subscription : session.getSessionState().getSubscriptions()) {
|
||||||
SimpleString q = createQueueForSubscription(subscription.topicName(), subscription.qualityOfService().value());
|
Queue q = createQueueForSubscription(subscription.topicName(), subscription.qualityOfService().value());
|
||||||
createConsumerForSubscriptionQueue(q, subscription.topicName(), subscription.qualityOfService().value());
|
createConsumerForSubscriptionQueue(q, subscription.topicName(), subscription.qualityOfService().value());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -86,23 +84,23 @@ public class MQTTSubscriptionManager {
|
||||||
/**
|
/**
|
||||||
* Creates a Queue if it doesn't already exist, based on a topic and address. Returning the queue name.
|
* Creates a Queue if it doesn't already exist, based on a topic and address. Returning the queue name.
|
||||||
*/
|
*/
|
||||||
private SimpleString createQueueForSubscription(String topic, int qos) throws Exception {
|
private Queue createQueueForSubscription(String topic, int qos) throws Exception {
|
||||||
String address = MQTTUtil.convertMQTTAddressFilterToCore(topic);
|
String address = MQTTUtil.convertMQTTAddressFilterToCore(topic);
|
||||||
SimpleString queue = getQueueNameForTopic(address);
|
SimpleString queue = getQueueNameForTopic(address);
|
||||||
|
|
||||||
Queue q = session.getServer().locateQueue(queue);
|
Queue q = session.getServer().locateQueue(queue);
|
||||||
if (q == null) {
|
if (q == null) {
|
||||||
session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0);
|
q = session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0);
|
||||||
}
|
}
|
||||||
return queue;
|
return q;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new consumer for the queue associated with a subscription
|
* Creates a new consumer for the queue associated with a subscription
|
||||||
*/
|
*/
|
||||||
private void createConsumerForSubscriptionQueue(SimpleString queue, String topic, int qos) throws Exception {
|
private void createConsumerForSubscriptionQueue(Queue queue, String topic, int qos) throws Exception {
|
||||||
long cid = session.getServer().getStorageManager().generateID();
|
long cid = session.getServer().getStorageManager().generateID();
|
||||||
ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue, null, false, true, -1);
|
ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue.getName(), null, false, true, -1);
|
||||||
consumer.setStarted(true);
|
consumer.setStarted(true);
|
||||||
|
|
||||||
consumers.put(topic, consumer);
|
consumers.put(topic, consumer);
|
||||||
|
@ -117,7 +115,7 @@ public class MQTTSubscriptionManager {
|
||||||
|
|
||||||
session.getSessionState().addSubscription(subscription);
|
session.getSessionState().addSubscription(subscription);
|
||||||
|
|
||||||
SimpleString q = createQueueForSubscription(topic, qos);
|
Queue q = createQueueForSubscription(topic, qos);
|
||||||
|
|
||||||
if (s == null) {
|
if (s == null) {
|
||||||
createConsumerForSubscriptionQueue(q, topic, qos);
|
createConsumerForSubscriptionQueue(q, topic, qos);
|
||||||
|
@ -171,7 +169,4 @@ public class MQTTSubscriptionManager {
|
||||||
return consumerQoSLevels;
|
return consumerQoSLevels;
|
||||||
}
|
}
|
||||||
|
|
||||||
ServerConsumer getConsumerForAddress(String address) {
|
|
||||||
return consumers.get(address);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,6 +69,8 @@ public class MQTTUtil {
|
||||||
return swapMQTTAndCoreWildCards(filter);
|
return swapMQTTAndCoreWildCards(filter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final MQTTLogger logger = MQTTLogger.LOGGER;
|
||||||
|
|
||||||
public static String convertCoreAddressFilterToMQTT(String filter) {
|
public static String convertCoreAddressFilterToMQTT(String filter) {
|
||||||
if (filter.startsWith(MQTT_RETAIN_ADDRESS_PREFIX)) {
|
if (filter.startsWith(MQTT_RETAIN_ADDRESS_PREFIX)) {
|
||||||
filter = filter.substring(MQTT_RETAIN_ADDRESS_PREFIX.length(), filter.length());
|
filter = filter.substring(MQTT_RETAIN_ADDRESS_PREFIX.length(), filter.length());
|
||||||
|
@ -148,8 +150,20 @@ public class MQTTUtil {
|
||||||
return message;
|
return message;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void logMessage(MQTTLogger logger, MqttMessage message, boolean inbound) {
|
public static void logMessage(MQTTSessionState state, MqttMessage message, boolean inbound) {
|
||||||
StringBuilder log = inbound ? new StringBuilder("Received ") : new StringBuilder("Sent ");
|
if (logger.isTraceEnabled()) {
|
||||||
|
|
||||||
|
StringBuilder log = new StringBuilder("MQTT(");
|
||||||
|
|
||||||
|
if (state != null) {
|
||||||
|
log.append(state.getClientId());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (inbound) {
|
||||||
|
log.append("): IN << ");
|
||||||
|
} else {
|
||||||
|
log.append("): OUT >> ");
|
||||||
|
}
|
||||||
|
|
||||||
if (message.fixedHeader() != null) {
|
if (message.fixedHeader() != null) {
|
||||||
log.append(message.fixedHeader().messageType().toString());
|
log.append(message.fixedHeader().messageType().toString());
|
||||||
|
@ -166,7 +180,8 @@ public class MQTTUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug(log.toString());
|
logger.trace(log.toString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -267,6 +267,32 @@ public class MQTTTest extends MQTTTestSupport {
|
||||||
assertEquals(NUM_MESSAGES, MQTTOutoingInterceptor.getMessageCount());
|
assertEquals(NUM_MESSAGES, MQTTOutoingInterceptor.getMessageCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 600 * 1000)
|
||||||
|
public void testSendMoreThanUniqueId() throws Exception {
|
||||||
|
int messages = (Short.MAX_VALUE * 2) + 1;
|
||||||
|
|
||||||
|
final MQTTClientProvider publisher = getMQTTClientProvider();
|
||||||
|
initializeConnection(publisher);
|
||||||
|
|
||||||
|
final MQTTClientProvider subscriber = getMQTTClientProvider();
|
||||||
|
initializeConnection(subscriber);
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
subscriber.subscribe("foo", EXACTLY_ONCE);
|
||||||
|
for (int i = 0; i < messages; i++) {
|
||||||
|
String payload = "Test Message: " + i;
|
||||||
|
publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE);
|
||||||
|
byte[] message = subscriber.receive(5000);
|
||||||
|
assertNotNull("Should get a message + [" + i + "]", message);
|
||||||
|
assertEquals(payload, new String(message));
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(messages, count);
|
||||||
|
subscriber.disconnect();
|
||||||
|
publisher.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60 * 1000)
|
@Test(timeout = 60 * 1000)
|
||||||
public void testSendAndReceiveLargeMessages() throws Exception {
|
public void testSendAndReceiveLargeMessages() throws Exception {
|
||||||
byte[] payload = new byte[1024 * 32];
|
byte[] payload = new byte[1024 * 32];
|
||||||
|
|
Loading…
Reference in New Issue