This commit is contained in:
Clebert Suconic 2017-06-22 12:01:41 -04:00
commit be8eb3ec9f
4 changed files with 8 additions and 6 deletions

View File

@ -291,9 +291,9 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
disconnect(false); disconnect(false);
} }
protected int send(int messageId, String topicName, int qosLevel, ByteBuf payload, int deliveryCount) { protected int send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf payload, int deliveryCount) {
boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0); boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0);
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), false, 0); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId); MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
MqttMessage publish = new MqttPublishMessage(header, varHeader, payload); MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
this.protocolManager.invokeOutgoing(publish, connection); this.protocolManager.invokeOutgoing(publish, connection);

View File

@ -35,6 +35,8 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_MESSAGE_RETAIN_KEY;
/** /**
* Handles MQTT Exactly Once (QoS level 2) Protocol. * Handles MQTT Exactly Once (QoS level 2) Protocol.
*/ */
@ -256,6 +258,7 @@ public class MQTTPublishManager {
private void sendServerMessage(int messageId, CoreMessage message, int deliveryCount, int qos) { private void sendServerMessage(int messageId, CoreMessage message, int deliveryCount, int qos) {
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString(), session.getWildcardConfiguration()); String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString(), session.getWildcardConfiguration());
boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY);
ByteBuf payload; ByteBuf payload;
switch (message.getType()) { switch (message.getType()) {
@ -274,7 +277,7 @@ public class MQTTPublishManager {
payload = bufferDup.readBytes(bufferDup.writerIndex()).byteBuf(); payload = bufferDup.readBytes(bufferDup.writerIndex()).byteBuf();
break; break;
} }
session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount); session.getProtocolHandler().send(messageId, address, qos, isRetain, payload, deliveryCount);
} }
private int decideQoS(Message message, ServerConsumer consumer) { private int decideQoS(Message message, ServerConsumer consumer) {

View File

@ -61,7 +61,7 @@ public class MQTTUtil {
public static final String MQTT_MESSAGE_TYPE_KEY = "mqtt.message.type"; public static final String MQTT_MESSAGE_TYPE_KEY = "mqtt.message.type";
public static final String MQTT_MESSAGE_RETAIN_KEY = "mqtt.message.retain"; public static final SimpleString MQTT_MESSAGE_RETAIN_KEY = new SimpleString("mqtt.message.retain");
public static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2."; public static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
@ -102,7 +102,7 @@ public class MQTTUtil {
CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE); CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE);
message.setAddress(address); message.setAddress(address);
message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain); message.putBooleanProperty(MQTT_MESSAGE_RETAIN_KEY, retain);
message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos); message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
message.setType(Message.BYTES_TYPE); message.setType(Message.BYTES_TYPE);
return message; return message;

View File

@ -64,7 +64,6 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport {
MqttFixedHeader header = message.fixedHeader(); MqttFixedHeader header = message.fixedHeader();
assertNotNull(header.messageType()); assertNotNull(header.messageType());
assertEquals(header.qosLevel().value(), AT_MOST_ONCE); assertEquals(header.qosLevel().value(), AT_MOST_ONCE);
// TODO resolve the following line based on result of ARTEMIS-1244, currently fails (2.1.0)
assertEquals(header.isRetain(), expectedProperties.get(RETAINED)); assertEquals(header.isRetain(), expectedProperties.get(RETAINED));
} catch (Throwable t) { } catch (Throwable t) {
collector.addError(t); collector.addError(t);