diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java index 098a756ad8..2cb1f7e2ab 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java @@ -17,13 +17,22 @@ package org.apache.activemq.artemis.core.protocol.mqtt; +import java.nio.charset.StandardCharsets; + import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader; +import io.netty.handler.codec.mqtt.MqttConnectMessage; +import io.netty.handler.codec.mqtt.MqttConnectPayload; +import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; +import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; @@ -133,37 +142,84 @@ public class MQTTUtil { public static void logMessage(MQTTSessionState state, MqttMessage message, boolean inbound) { if (logger.isTraceEnabled()) { - - StringBuilder log = new StringBuilder("MQTT("); - - if (state != null) { - log.append(state.getClientId()); - } - - if (inbound) { - log.append("): IN << "); - } else { - log.append("): OUT >> "); - } - - if (message.fixedHeader() != null) { - log.append(message.fixedHeader().messageType().toString()); - - if (message.variableHeader() instanceof MqttPublishVariableHeader) { - log.append("(" + ((MqttPublishVariableHeader) message.variableHeader()).messageId() + ") " + message.fixedHeader().qosLevel()); - } else if (message.variableHeader() instanceof MqttMessageIdVariableHeader) { - log.append("(" + ((MqttMessageIdVariableHeader) message.variableHeader()).messageId() + ")"); - } - - if (message.fixedHeader().messageType() == MqttMessageType.SUBSCRIBE) { - for (MqttTopicSubscription sub : ((MqttSubscribeMessage) message).payload().topicSubscriptions()) { - log.append("\n\t" + sub.topicName() + " : " + sub.qualityOfService()); - } - } - - logger.trace(log.toString()); - } + traceMessage(state, message, inbound); } } + public static void traceMessage(MQTTSessionState state, MqttMessage message, boolean inbound) { + StringBuilder log = new StringBuilder("MQTT("); + + if (state != null) { + log.append(state.getClientId()); + } + + if (inbound) { + log.append("): IN << "); + } else { + log.append("): OUT >> "); + } + + if (message.fixedHeader() != null) { + log.append(message.fixedHeader().messageType().toString()); + + if (message.variableHeader() instanceof MqttMessageIdVariableHeader) { + log.append("(" + ((MqttMessageIdVariableHeader) message.variableHeader()).messageId() + ")"); + } + + switch (message.fixedHeader().messageType()) { + case PUBLISH: + MqttPublishVariableHeader publishHeader = (MqttPublishVariableHeader) message.variableHeader(); + String publishPayload = ((MqttPublishMessage)message).payload().toString(StandardCharsets.UTF_8); + final int maxPayloadLogSize = 256; + log.append("(" + publishHeader.packetId() + ")") + .append(" topic=" + publishHeader.topicName()) + .append(", qos=" + message.fixedHeader().qosLevel()) + .append(", retain=" + message.fixedHeader().isRetain()) + .append(", dup=" + message.fixedHeader().isDup()) + .append(", payload=" + (publishPayload.length() > maxPayloadLogSize ? publishPayload.substring(0, maxPayloadLogSize) : publishPayload)); + break; + case CONNECT: + MqttConnectVariableHeader connectHeader = (MqttConnectVariableHeader) message.variableHeader(); + MqttConnectPayload payload = ((MqttConnectMessage)message).payload(); + log.append(" protocol=(").append(connectHeader.name()).append(", ").append(connectHeader.version()).append(")") + .append(", hasPassword=").append(connectHeader.hasPassword()) + .append(", isCleanSession=").append(connectHeader.isCleanSession()) + .append(", keepAliveTimeSeconds=").append(connectHeader.keepAliveTimeSeconds()) + .append(", clientIdentifier=").append(payload.clientIdentifier()) + .append(", hasUserName=").append(connectHeader.hasUserName()); + if (connectHeader.hasUserName()) { + log.append(", userName=").append(payload.userName()); + } + log.append(", isWillFlag=").append(connectHeader.isWillFlag()); + if (connectHeader.isWillFlag()) { + log.append(", willQos=").append(connectHeader.willQos()) + .append(", isWillRetain=").append(connectHeader.isWillRetain()) + .append(", willTopic=").append(payload.willTopic()); + } + break; + case CONNACK: + MqttConnAckVariableHeader connackHeader = (MqttConnAckVariableHeader) message.variableHeader(); + log.append(" connectReturnCode=").append(connackHeader.connectReturnCode().byteValue()) + .append(", sessionPresent=").append(connackHeader.isSessionPresent()); + break; + case SUBSCRIBE: + for (MqttTopicSubscription sub : ((MqttSubscribeMessage) message).payload().topicSubscriptions()) { + log.append("\n\t" + sub.topicName() + " : " + sub.qualityOfService()); + } + break; + case SUBACK: + for (Integer qos : ((MqttSubAckMessage) message).payload().grantedQoSLevels()) { + log.append("\n\t" + qos); + } + break; + case UNSUBSCRIBE: + for (String topic : ((MqttUnsubscribeMessage) message).payload().topics()) { + log.append("\n\t" + topic); + } + break; + } + + logger.trace(log.toString()); + } + } } diff --git a/docs/user-manual/en/protocols-interoperability.md b/docs/user-manual/en/protocols-interoperability.md index f371fb1401..8755935317 100644 --- a/docs/user-manual/en/protocols-interoperability.md +++ b/docs/user-manual/en/protocols-interoperability.md @@ -243,6 +243,21 @@ the broker will proceed to publish the will message to the specified address (as Other subscribers to the will topic will receive the will message and can react accordingly. This feature can be useful in an IoT style scenario to detect errors across a potentially large scale deployment of devices. +### Debug Logging + +Detailed protocol logging (e.g. packets in/out) can be activated via the following steps: + +1) Open `/etc/logging.properties` +2) Add `org.apache.activemq.artemis.core.protocol.mqtt` to the `loggers` list. +3) Add this line to enable `TRACE` logging for this new logger: `logger.org.apache.activemq.artemis.core.protocol.mqtt.level=TRACE` +4) Ensure the `level` for the `handler` you want to log the message doesn't block the `TRACE` logging. For example, + modify the `level` of the `CONSOLE` `handler` like so: `handler.CONSOLE.level=TRACE` + +The MQTT specification doesn't dictate the format of the payloads which clients publish. As far as the broker is +concerned a payload is just just an array of bytes. However, to facilitate logging the broker will encode the payloads +as UTF-8 strings and print them up to 256 characters. Payload logging is limited to avoid filling the logs with potentially +hundreds of megabytes of unhelpful information. + ### Wild card subscriptions