ARTEMIS-1244 propagate retain flag of received message
This commit is contained in:
parent
c2ad9cab0d
commit
60fad35cfe
|
@ -291,9 +291,9 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
||||||
disconnect(false);
|
disconnect(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected int send(int messageId, String topicName, int qosLevel, ByteBuf payload, int deliveryCount) {
|
protected int send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf payload, int deliveryCount) {
|
||||||
boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0);
|
boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0);
|
||||||
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), false, 0);
|
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0);
|
||||||
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
|
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
|
||||||
MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
|
MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
|
||||||
this.protocolManager.invokeOutgoing(publish, connection);
|
this.protocolManager.invokeOutgoing(publish, connection);
|
||||||
|
|
|
@ -35,6 +35,8 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_MESSAGE_RETAIN_KEY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handles MQTT Exactly Once (QoS level 2) Protocol.
|
* Handles MQTT Exactly Once (QoS level 2) Protocol.
|
||||||
*/
|
*/
|
||||||
|
@ -256,6 +258,7 @@ public class MQTTPublishManager {
|
||||||
|
|
||||||
private void sendServerMessage(int messageId, CoreMessage message, int deliveryCount, int qos) {
|
private void sendServerMessage(int messageId, CoreMessage message, int deliveryCount, int qos) {
|
||||||
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString(), session.getWildcardConfiguration());
|
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString(), session.getWildcardConfiguration());
|
||||||
|
boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY);
|
||||||
|
|
||||||
ByteBuf payload;
|
ByteBuf payload;
|
||||||
switch (message.getType()) {
|
switch (message.getType()) {
|
||||||
|
@ -274,7 +277,7 @@ public class MQTTPublishManager {
|
||||||
payload = bufferDup.readBytes(bufferDup.writerIndex()).byteBuf();
|
payload = bufferDup.readBytes(bufferDup.writerIndex()).byteBuf();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);
|
session.getProtocolHandler().send(messageId, address, qos, isRetain, payload, deliveryCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
private int decideQoS(Message message, ServerConsumer consumer) {
|
private int decideQoS(Message message, ServerConsumer consumer) {
|
||||||
|
|
|
@ -61,7 +61,7 @@ public class MQTTUtil {
|
||||||
|
|
||||||
public static final String MQTT_MESSAGE_TYPE_KEY = "mqtt.message.type";
|
public static final String MQTT_MESSAGE_TYPE_KEY = "mqtt.message.type";
|
||||||
|
|
||||||
public static final String MQTT_MESSAGE_RETAIN_KEY = "mqtt.message.retain";
|
public static final SimpleString MQTT_MESSAGE_RETAIN_KEY = new SimpleString("mqtt.message.retain");
|
||||||
|
|
||||||
public static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
|
public static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
|
||||||
|
|
||||||
|
@ -102,7 +102,7 @@ public class MQTTUtil {
|
||||||
|
|
||||||
CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE);
|
CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE);
|
||||||
message.setAddress(address);
|
message.setAddress(address);
|
||||||
message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain);
|
message.putBooleanProperty(MQTT_MESSAGE_RETAIN_KEY, retain);
|
||||||
message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
|
message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
|
||||||
message.setType(Message.BYTES_TYPE);
|
message.setType(Message.BYTES_TYPE);
|
||||||
return message;
|
return message;
|
||||||
|
|
|
@ -64,7 +64,6 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport {
|
||||||
MqttFixedHeader header = message.fixedHeader();
|
MqttFixedHeader header = message.fixedHeader();
|
||||||
assertNotNull(header.messageType());
|
assertNotNull(header.messageType());
|
||||||
assertEquals(header.qosLevel().value(), AT_MOST_ONCE);
|
assertEquals(header.qosLevel().value(), AT_MOST_ONCE);
|
||||||
// TODO resolve the following line based on result of ARTEMIS-1244, currently fails (2.1.0)
|
|
||||||
assertEquals(header.isRetain(), expectedProperties.get(RETAINED));
|
assertEquals(header.isRetain(), expectedProperties.get(RETAINED));
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
collector.addError(t);
|
collector.addError(t);
|
||||||
|
|
Loading…
Reference in New Issue