From 60fad35cfe74eb6cd40d79adf56621f022e79416 Mon Sep 17 00:00:00 2001 From: Jiri Danek Date: Wed, 21 Jun 2017 17:51:56 +0200 Subject: [PATCH] ARTEMIS-1244 propagate retain flag of received message --- .../artemis/core/protocol/mqtt/MQTTProtocolHandler.java | 4 ++-- .../artemis/core/protocol/mqtt/MQTTPublishManager.java | 5 ++++- .../apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java | 4 ++-- .../mqtt/imported/MQTTInterceptorPropertiesTest.java | 1 - 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java index 4edce31ac6..6add2dbb5a 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java @@ -291,9 +291,9 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { disconnect(false); } - protected int send(int messageId, String topicName, int qosLevel, ByteBuf payload, int deliveryCount) { + protected int send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf payload, int deliveryCount) { boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0); - MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), false, 0); + MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0); MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId); MqttMessage publish = new MqttPublishMessage(header, varHeader, payload); this.protocolManager.invokeOutgoing(publish, connection); diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index 8dfaf348d9..5da027abad 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -35,6 +35,8 @@ import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.transaction.Transaction; import org.jboss.logging.Logger; +import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_MESSAGE_RETAIN_KEY; + /** * Handles MQTT Exactly Once (QoS level 2) Protocol. */ @@ -256,6 +258,7 @@ public class MQTTPublishManager { private void sendServerMessage(int messageId, CoreMessage message, int deliveryCount, int qos) { String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString(), session.getWildcardConfiguration()); + boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY); ByteBuf payload; switch (message.getType()) { @@ -274,7 +277,7 @@ public class MQTTPublishManager { payload = bufferDup.readBytes(bufferDup.writerIndex()).byteBuf(); break; } - session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount); + session.getProtocolHandler().send(messageId, address, qos, isRetain, payload, deliveryCount); } private int decideQoS(Message message, ServerConsumer consumer) { diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java index 0e09fb0c09..76664f62df 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java @@ -61,7 +61,7 @@ public class MQTTUtil { public static final String MQTT_MESSAGE_TYPE_KEY = "mqtt.message.type"; - public static final String MQTT_MESSAGE_RETAIN_KEY = "mqtt.message.retain"; + public static final SimpleString MQTT_MESSAGE_RETAIN_KEY = new SimpleString("mqtt.message.retain"); public static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2."; @@ -102,7 +102,7 @@ public class MQTTUtil { CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE); message.setAddress(address); - message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain); + message.putBooleanProperty(MQTT_MESSAGE_RETAIN_KEY, retain); message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos); message.setType(Message.BYTES_TYPE); return message; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java index 0b6219456e..375e2f2986 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java @@ -64,7 +64,6 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport { MqttFixedHeader header = message.fixedHeader(); assertNotNull(header.messageType()); assertEquals(header.qosLevel().value(), AT_MOST_ONCE); - // TODO resolve the following line based on result of ARTEMIS-1244, currently fails (2.1.0) assertEquals(header.isRetain(), expectedProperties.get(RETAINED)); } catch (Throwable t) { collector.addError(t);