ARTEMIS-3781 send PUBREC on duplicate PUBLISH for MQTT QoS 2

The MQTT 5 (and 3.1.1) specification states:

    Until it has received the corresponding PUBREL packet, the receiver
MUST acknowledge any subsequent PUBLISH packet with the same Packet
Identifier by sending a PUBREC. It MUST NOT cause duplicate messages to
be delivered to any onward recipients in this case [MQTT-4.3.3-10].

The broker prevents a duplicate message, but it doesn't respond with a
PUBREC. This commit fixes that.
This commit is contained in:
Justin Bertram 2022-04-19 13:04:20 -05:00 committed by clebertsuconic
parent b5539b9da4
commit c5f45ee44d
2 changed files with 66 additions and 2 deletions

View File

@ -268,8 +268,9 @@ public class MQTTPublishManager {
tx.rollback(); tx.rollback();
throw t; throw t;
} }
createMessageAck(messageId, qos, internal);
} }
createMessageAck(messageId, qos, internal);
} }
} }

View File

@ -32,10 +32,12 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport; import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
import org.eclipse.paho.mqttv5.client.MqttClient; import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties; import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -55,7 +57,6 @@ import org.junit.Test;
* [MQTT-4.3.2-5] In the QoS 1 delivery protocol, the receiver after it has sent a PUBACK packet the receiver MUST treat any incoming PUBLISH packet that contains the same Packet Identifier as being a new Application Message, irrespective of the setting of its DUP flag. * [MQTT-4.3.2-5] In the QoS 1 delivery protocol, the receiver after it has sent a PUBACK packet the receiver MUST treat any incoming PUBLISH packet that contains the same Packet Identifier as being a new Application Message, irrespective of the setting of its DUP flag.
* [MQTT-4.3.3-6] In the QoS 2 delivery protocol, the sender MUST NOT re-send the PUBLISH once it has sent the corresponding PUBREL packet. * [MQTT-4.3.3-6] In the QoS 2 delivery protocol, the sender MUST NOT re-send the PUBLISH once it has sent the corresponding PUBREL packet.
* [MQTT-4.3.3-9] In the QoS 2 delivery protocol, the receiver if it has sent a PUBREC with a Reason Code of 0x80 or greater, the receiver MUST treat any subsequent PUBLISH packet that contains that Packet Identifier as being a new Application Message. * [MQTT-4.3.3-9] In the QoS 2 delivery protocol, the receiver if it has sent a PUBREC with a Reason Code of 0x80 or greater, the receiver MUST treat any subsequent PUBLISH packet that contains that Packet Identifier as being a new Application Message.
* [MQTT-4.3.3-10] In the QoS 2 delivery protocol, the receiver until it has received the corresponding PUBREL packet, the receiver MUST acknowledge any subsequent PUBLISH packet with the same Packet Identifier by sending a PUBREC. It MUST NOT cause duplicate messages to be delivered to any onward recipients in this case.
* [MQTT-4.3.3-12] In the QoS 2 delivery protocol, the receiver After it has sent a PUBCOMP, the receiver MUST treat any subsequent PUBLISH packet that contains that Packet Identifier as being a new Application Message. * [MQTT-4.3.3-12] In the QoS 2 delivery protocol, the receiver After it has sent a PUBCOMP, the receiver MUST treat any subsequent PUBLISH packet that contains that Packet Identifier as being a new Application Message.
*/ */
@ -521,6 +522,68 @@ public class QoSTests extends MQTT5TestSupport {
consumer.close(); consumer.close();
} }
/*
* [MQTT-4.3.3-10] In the QoS 2 delivery protocol, the receiver until it has received the corresponding PUBREL
* packet, the receiver MUST acknowledge any subsequent PUBLISH packet with the same Packet Identifier by sending a
* PUBREC. It MUST NOT cause duplicate messages to be delivered to any onward recipients in this case.
*
* In this test we simulate a client sending a QoS 2 message but being disconnected before it receives the initial
* PUBREC. When the client connects again it resends the message, but the broker does not duplicate it. It simply
* responds withe appropriate PUBREC and the client completes the QoS process.
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testQoS2DuplicatePub() throws Exception {
final String TOPIC = RandomUtil.randomString();
final CountDownLatch ackLatch = new CountDownLatch(1);
final AtomicInteger packetId = new AtomicInteger();
AtomicInteger count = new AtomicInteger(0);
final byte[] PAYLOAD = RandomUtil.randomString().getBytes();
MQTTInterceptor incomingInterceptor = (packet, connection) -> {
if (packet.fixedHeader().messageType() == MqttMessageType.PUBLISH) {
packetId.set(((MqttPublishMessage)packet).variableHeader().packetId());
}
return true;
};
MQTTInterceptor outgoingInterceptor = (packet, connection) -> {
if (count.get() == 0 && packet.fixedHeader().messageType() == MqttMessageType.PUBREC) {
ackLatch.countDown();
count.incrementAndGet();
connection.disconnect(true);
return false;
}
return true;
};
server.getRemotingService().addIncomingInterceptor(incomingInterceptor);
server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor);
Queue queue = server.createQueue(new QueueConfiguration(TOPIC));
MqttClient producer = createPahoClient("producer");
MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
connectionOptions.setCleanStart(false);
connectionOptions.setSessionExpiryInterval(999L);
producer.connect(connectionOptions);
try {
producer.publish(TOPIC, PAYLOAD, 2, false);
} catch (Exception e) {
// ignore
}
assertTrue(ackLatch.await(2, TimeUnit.SECONDS));
producer.disconnectForcibly(1, 1, false);
producer.close();
producer = createPahoClient("producer");
producer.connect(connectionOptions);
producer.publish(TOPIC, PAYLOAD, 2, false);
producer.disconnect();
producer.close();
assertEquals(1, queue.getMessageCount());
}
/* /*
* [MQTT-4.3.3-11] In the QoS 2 delivery protocol, the receiver MUST respond to a PUBREL packet by sending a PUBCOMP * [MQTT-4.3.3-11] In the QoS 2 delivery protocol, the receiver MUST respond to a PUBREL packet by sending a PUBCOMP
* packet containing the same Packet Identifier as the PUBREL. * packet containing the same Packet Identifier as the PUBREL.