ARTEMIS-1465 improve MQTT protocol logging
This commit is contained in:
parent
bf0137e73d
commit
499f737343
|
@ -17,13 +17,22 @@
|
|||
|
||||
package org.apache.activemq.artemis.core.protocol.mqtt;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
|
||||
import io.netty.handler.codec.mqtt.MqttConnectMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttConnectPayload;
|
||||
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
|
||||
import io.netty.handler.codec.mqtt.MqttMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
|
||||
import io.netty.handler.codec.mqtt.MqttMessageType;
|
||||
import io.netty.handler.codec.mqtt.MqttPublishMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
|
||||
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
|
||||
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
@ -133,37 +142,84 @@ public class MQTTUtil {
|
|||
|
||||
public static void logMessage(MQTTSessionState state, MqttMessage message, boolean inbound) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
|
||||
StringBuilder log = new StringBuilder("MQTT(");
|
||||
|
||||
if (state != null) {
|
||||
log.append(state.getClientId());
|
||||
}
|
||||
|
||||
if (inbound) {
|
||||
log.append("): IN << ");
|
||||
} else {
|
||||
log.append("): OUT >> ");
|
||||
}
|
||||
|
||||
if (message.fixedHeader() != null) {
|
||||
log.append(message.fixedHeader().messageType().toString());
|
||||
|
||||
if (message.variableHeader() instanceof MqttPublishVariableHeader) {
|
||||
log.append("(" + ((MqttPublishVariableHeader) message.variableHeader()).messageId() + ") " + message.fixedHeader().qosLevel());
|
||||
} else if (message.variableHeader() instanceof MqttMessageIdVariableHeader) {
|
||||
log.append("(" + ((MqttMessageIdVariableHeader) message.variableHeader()).messageId() + ")");
|
||||
}
|
||||
|
||||
if (message.fixedHeader().messageType() == MqttMessageType.SUBSCRIBE) {
|
||||
for (MqttTopicSubscription sub : ((MqttSubscribeMessage) message).payload().topicSubscriptions()) {
|
||||
log.append("\n\t" + sub.topicName() + " : " + sub.qualityOfService());
|
||||
}
|
||||
}
|
||||
|
||||
logger.trace(log.toString());
|
||||
}
|
||||
traceMessage(state, message, inbound);
|
||||
}
|
||||
}
|
||||
|
||||
public static void traceMessage(MQTTSessionState state, MqttMessage message, boolean inbound) {
|
||||
StringBuilder log = new StringBuilder("MQTT(");
|
||||
|
||||
if (state != null) {
|
||||
log.append(state.getClientId());
|
||||
}
|
||||
|
||||
if (inbound) {
|
||||
log.append("): IN << ");
|
||||
} else {
|
||||
log.append("): OUT >> ");
|
||||
}
|
||||
|
||||
if (message.fixedHeader() != null) {
|
||||
log.append(message.fixedHeader().messageType().toString());
|
||||
|
||||
if (message.variableHeader() instanceof MqttMessageIdVariableHeader) {
|
||||
log.append("(" + ((MqttMessageIdVariableHeader) message.variableHeader()).messageId() + ")");
|
||||
}
|
||||
|
||||
switch (message.fixedHeader().messageType()) {
|
||||
case PUBLISH:
|
||||
MqttPublishVariableHeader publishHeader = (MqttPublishVariableHeader) message.variableHeader();
|
||||
String publishPayload = ((MqttPublishMessage)message).payload().toString(StandardCharsets.UTF_8);
|
||||
final int maxPayloadLogSize = 256;
|
||||
log.append("(" + publishHeader.packetId() + ")")
|
||||
.append(" topic=" + publishHeader.topicName())
|
||||
.append(", qos=" + message.fixedHeader().qosLevel())
|
||||
.append(", retain=" + message.fixedHeader().isRetain())
|
||||
.append(", dup=" + message.fixedHeader().isDup())
|
||||
.append(", payload=" + (publishPayload.length() > maxPayloadLogSize ? publishPayload.substring(0, maxPayloadLogSize) : publishPayload));
|
||||
break;
|
||||
case CONNECT:
|
||||
MqttConnectVariableHeader connectHeader = (MqttConnectVariableHeader) message.variableHeader();
|
||||
MqttConnectPayload payload = ((MqttConnectMessage)message).payload();
|
||||
log.append(" protocol=(").append(connectHeader.name()).append(", ").append(connectHeader.version()).append(")")
|
||||
.append(", hasPassword=").append(connectHeader.hasPassword())
|
||||
.append(", isCleanSession=").append(connectHeader.isCleanSession())
|
||||
.append(", keepAliveTimeSeconds=").append(connectHeader.keepAliveTimeSeconds())
|
||||
.append(", clientIdentifier=").append(payload.clientIdentifier())
|
||||
.append(", hasUserName=").append(connectHeader.hasUserName());
|
||||
if (connectHeader.hasUserName()) {
|
||||
log.append(", userName=").append(payload.userName());
|
||||
}
|
||||
log.append(", isWillFlag=").append(connectHeader.isWillFlag());
|
||||
if (connectHeader.isWillFlag()) {
|
||||
log.append(", willQos=").append(connectHeader.willQos())
|
||||
.append(", isWillRetain=").append(connectHeader.isWillRetain())
|
||||
.append(", willTopic=").append(payload.willTopic());
|
||||
}
|
||||
break;
|
||||
case CONNACK:
|
||||
MqttConnAckVariableHeader connackHeader = (MqttConnAckVariableHeader) message.variableHeader();
|
||||
log.append(" connectReturnCode=").append(connackHeader.connectReturnCode().byteValue())
|
||||
.append(", sessionPresent=").append(connackHeader.isSessionPresent());
|
||||
break;
|
||||
case SUBSCRIBE:
|
||||
for (MqttTopicSubscription sub : ((MqttSubscribeMessage) message).payload().topicSubscriptions()) {
|
||||
log.append("\n\t" + sub.topicName() + " : " + sub.qualityOfService());
|
||||
}
|
||||
break;
|
||||
case SUBACK:
|
||||
for (Integer qos : ((MqttSubAckMessage) message).payload().grantedQoSLevels()) {
|
||||
log.append("\n\t" + qos);
|
||||
}
|
||||
break;
|
||||
case UNSUBSCRIBE:
|
||||
for (String topic : ((MqttUnsubscribeMessage) message).payload().topics()) {
|
||||
log.append("\n\t" + topic);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
logger.trace(log.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -243,6 +243,21 @@ the broker will proceed to publish the will message to the specified address (as
|
|||
Other subscribers to the will topic will receive the will message and can react accordingly. This feature can be useful
|
||||
in an IoT style scenario to detect errors across a potentially large scale deployment of devices.
|
||||
|
||||
### Debug Logging
|
||||
|
||||
Detailed protocol logging (e.g. packets in/out) can be activated via the following steps:
|
||||
|
||||
1) Open `<ARTEMIS_INSTANCE>/etc/logging.properties`
|
||||
2) Add `org.apache.activemq.artemis.core.protocol.mqtt` to the `loggers` list.
|
||||
3) Add this line to enable `TRACE` logging for this new logger: `logger.org.apache.activemq.artemis.core.protocol.mqtt.level=TRACE`
|
||||
4) Ensure the `level` for the `handler` you want to log the message doesn't block the `TRACE` logging. For example,
|
||||
modify the `level` of the `CONSOLE` `handler` like so: `handler.CONSOLE.level=TRACE`
|
||||
|
||||
The MQTT specification doesn't dictate the format of the payloads which clients publish. As far as the broker is
|
||||
concerned a payload is just just an array of bytes. However, to facilitate logging the broker will encode the payloads
|
||||
as UTF-8 strings and print them up to 256 characters. Payload logging is limited to avoid filling the logs with potentially
|
||||
hundreds of megabytes of unhelpful information.
|
||||
|
||||
|
||||
### Wild card subscriptions
|
||||
|
||||
|
|
Loading…
Reference in New Issue