ARTEMIS-4459 log when ignoring dupe MQTT QoS2 pub

In accordance with the QoS2 protocol outlined in the MQTT
specification(s), once the broker receives a PUBLISH then any other
PUBLISH it receives on that same session with the same packet ID must be
ignored until the QoS2 protocol for that ID is completed.

The broker does this, but it doesn't log anything so it's not clear when
this is actually happening.
This commit is contained in:
Justin Bertram 2023-10-12 15:41:40 -05:00 committed by clebertsuconic
parent c46c6f11f6
commit 055100751c
2 changed files with 11 additions and 5 deletions

View File

@ -61,4 +61,7 @@ public interface MQTTLogger {
@LogMessage(id = 834008, value = "Failed to remove session state for client with ID: {}", level = LogMessage.Level.ERROR)
void failedToRemoveSessionState(String clientID, Exception e);
@LogMessage(id = 834009, value = "Ignoring duplicate MQTT QoS2 PUBLISH packet for packet ID {} from client with ID {}.", level = LogMessage.Level.WARN)
void ignoringQoS2Publish(String clientId, long packetId);
}

View File

@ -223,10 +223,11 @@ public class MQTTPublishManager {
if (qos > 0) {
serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES);
}
int messageId = message.variableHeader().packetId();
if (qos < 2 || !state.getPubRec().contains(messageId)) {
int packetId = message.variableHeader().packetId();
boolean qos2PublishAlreadyReceived = state.getPubRec().contains(packetId);
if (qos < 2 || !qos2PublishAlreadyReceived) {
if (qos == 2 && !internal)
state.getPubRec().add(messageId);
state.getPubRec().add(packetId);
Transaction tx = session.getServerSession().newTransaction();
try {
@ -252,7 +253,7 @@ public class MQTTPublishManager {
throw e;
}
if (session.getVersion() == MQTTVersion.MQTT_5) {
sendMessageAck(internal, qos, messageId, MQTTReasonCodes.NOT_AUTHORIZED);
sendMessageAck(internal, qos, packetId, MQTTReasonCodes.NOT_AUTHORIZED);
return;
} else if (session.getVersion() == MQTTVersion.MQTT_3_1_1) {
/*
@ -287,9 +288,11 @@ public class MQTTPublishManager {
tx.rollback();
throw t;
}
} else if (qos2PublishAlreadyReceived) {
MQTTLogger.LOGGER.ignoringQoS2Publish(state.getClientId(), packetId);
}
createMessageAck(messageId, qos, internal);
createMessageAck(packetId, qos, internal);
}
}