Implement support for intercepting additional MQTT control packets
Previously, only the PUBLISH packet was intercepted. This patch modifies the code to add support for the other incoming/outgoing MQTT control packets.
This commit is contained in:
parent
9cdff41da4
commit
654ea69e78
|
@ -172,6 +172,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
||||||
|
|
||||||
String clientId = connect.payload().clientIdentifier();
|
String clientId = connect.payload().clientIdentifier();
|
||||||
session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().passwordInBytes(), connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession());
|
session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().passwordInBytes(), connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession());
|
||||||
|
|
||||||
|
this.protocolManager.invokeIncoming(connect, this.connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
void disconnect(boolean error) {
|
void disconnect(boolean error) {
|
||||||
|
@ -183,6 +185,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
||||||
MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, true);
|
MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, true);
|
||||||
MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader);
|
MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader);
|
||||||
|
|
||||||
|
this.protocolManager.invokeOutgoing(message, this.connection);
|
||||||
ctx.write(message);
|
ctx.write(message);
|
||||||
ctx.flush();
|
ctx.flush();
|
||||||
}
|
}
|
||||||
|
@ -225,30 +228,43 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
||||||
MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, qos, // Spec requires 01 in header for rel
|
MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, qos, // Spec requires 01 in header for rel
|
||||||
false, 0);
|
false, 0);
|
||||||
MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(messageId));
|
MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(messageId));
|
||||||
|
|
||||||
|
this.protocolManager.invokeOutgoing(rel, this.connection);
|
||||||
|
|
||||||
ctx.write(rel);
|
ctx.write(rel);
|
||||||
ctx.flush();
|
ctx.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
void handlePuback(MqttPubAckMessage message) throws Exception {
|
void handlePuback(MqttPubAckMessage message) throws Exception {
|
||||||
|
this.protocolManager.invokeIncoming(message, this.connection);
|
||||||
|
|
||||||
session.getMqttPublishManager().handlePubAck(message.variableHeader().messageId());
|
session.getMqttPublishManager().handlePubAck(message.variableHeader().messageId());
|
||||||
}
|
}
|
||||||
|
|
||||||
void handlePubrec(MqttMessage message) throws Exception {
|
void handlePubrec(MqttMessage message) throws Exception {
|
||||||
|
this.protocolManager.invokeIncoming(message, this.connection);
|
||||||
|
|
||||||
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
|
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
|
||||||
session.getMqttPublishManager().handlePubRec(messageId);
|
session.getMqttPublishManager().handlePubRec(messageId);
|
||||||
}
|
}
|
||||||
|
|
||||||
void handlePubrel(MqttMessage message) {
|
void handlePubrel(MqttMessage message) {
|
||||||
|
this.protocolManager.invokeIncoming(message, this.connection);
|
||||||
|
|
||||||
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
|
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
|
||||||
session.getMqttPublishManager().handlePubRel(messageId);
|
session.getMqttPublishManager().handlePubRel(messageId);
|
||||||
}
|
}
|
||||||
|
|
||||||
void handlePubcomp(MqttMessage message) throws Exception {
|
void handlePubcomp(MqttMessage message) throws Exception {
|
||||||
|
this.protocolManager.invokeIncoming(message, this.connection);
|
||||||
|
|
||||||
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
|
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
|
||||||
session.getMqttPublishManager().handlePubComp(messageId);
|
session.getMqttPublishManager().handlePubComp(messageId);
|
||||||
}
|
}
|
||||||
|
|
||||||
void handleSubscribe(MqttSubscribeMessage message, ChannelHandlerContext ctx) throws Exception {
|
void handleSubscribe(MqttSubscribeMessage message, ChannelHandlerContext ctx) throws Exception {
|
||||||
|
this.protocolManager.invokeIncoming(message, this.connection);
|
||||||
|
|
||||||
MQTTSubscriptionManager subscriptionManager = session.getSubscriptionManager();
|
MQTTSubscriptionManager subscriptionManager = session.getSubscriptionManager();
|
||||||
int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions());
|
int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions());
|
||||||
|
|
||||||
|
@ -264,6 +280,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
||||||
}
|
}
|
||||||
|
|
||||||
void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception {
|
void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception {
|
||||||
|
this.protocolManager.invokeIncoming(message, this.connection);
|
||||||
|
|
||||||
session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
|
session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
|
||||||
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
||||||
MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader());
|
MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader());
|
||||||
|
@ -273,10 +291,14 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
||||||
}
|
}
|
||||||
|
|
||||||
void handleUnsuback(MqttUnsubAckMessage message) {
|
void handleUnsuback(MqttUnsubAckMessage message) {
|
||||||
|
this.protocolManager.invokeOutgoing(message, this.connection);
|
||||||
|
|
||||||
disconnect(true);
|
disconnect(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void handlePingreq(MqttMessage message, ChannelHandlerContext ctx) {
|
void handlePingreq(MqttMessage message, ChannelHandlerContext ctx) {
|
||||||
|
this.protocolManager.invokeIncoming(message, this.connection);
|
||||||
|
|
||||||
MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0));
|
MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0));
|
||||||
MQTTUtil.logMessage(session.getSessionState(), pingResp, false);
|
MQTTUtil.logMessage(session.getSessionState(), pingResp, false);
|
||||||
ctx.write(pingResp);
|
ctx.write(pingResp);
|
||||||
|
@ -288,6 +310,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
||||||
}
|
}
|
||||||
|
|
||||||
void handleDisconnect(MqttMessage message) {
|
void handleDisconnect(MqttMessage message) {
|
||||||
|
this.protocolManager.invokeIncoming(message, this.connection);
|
||||||
|
|
||||||
disconnect(false);
|
disconnect(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -296,6 +320,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
||||||
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0);
|
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0);
|
||||||
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
|
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
|
||||||
MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
|
MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
|
||||||
|
|
||||||
this.protocolManager.invokeOutgoing(publish, connection);
|
this.protocolManager.invokeOutgoing(publish, connection);
|
||||||
|
|
||||||
MQTTUtil.logMessage(session.getSessionState(), publish, false);
|
MQTTUtil.logMessage(session.getSessionState(), publish, false);
|
||||||
|
|
|
@ -41,14 +41,14 @@ public class InterceptorExample {
|
||||||
System.out.println("Connected to Artemis");
|
System.out.println("Connected to Artemis");
|
||||||
|
|
||||||
// Subscribe to a topic
|
// Subscribe to a topic
|
||||||
Topic[] topics = {new Topic("mqtt/example/interceptor", QoS.AT_LEAST_ONCE)};
|
Topic[] topics = {new Topic("mqtt/example/interceptor", QoS.EXACTLY_ONCE)};
|
||||||
connection.subscribe(topics);
|
connection.subscribe(topics);
|
||||||
System.out.println("Subscribed to topics.");
|
System.out.println("Subscribed to topics.");
|
||||||
|
|
||||||
// Publish message
|
// Publish message
|
||||||
String payload1 = "This is message 1";
|
String payload1 = "This is message 1";
|
||||||
|
|
||||||
connection.publish("mqtt/example/interceptor", payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
|
connection.publish("mqtt/example/interceptor", payload1.getBytes(), QoS.EXACTLY_ONCE, false);
|
||||||
|
|
||||||
System.out.println("Sent message");
|
System.out.println("Sent message");
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.mqtt.example;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
|
|
||||||
import io.netty.handler.codec.mqtt.MqttPublishMessage;
|
import io.netty.handler.codec.mqtt.MqttPublishMessage;
|
||||||
|
import io.netty.handler.codec.mqtt.MqttConnectMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
|
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
|
||||||
|
|
||||||
|
|
||||||
|
@ -36,7 +37,9 @@ public class SimpleMQTTInterceptor implements MQTTInterceptor {
|
||||||
public boolean intercept(final MqttMessage mqttMessage, RemotingConnection connection) {
|
public boolean intercept(final MqttMessage mqttMessage, RemotingConnection connection) {
|
||||||
System.out.println("MQTT Interceptor gets called ");
|
System.out.println("MQTT Interceptor gets called ");
|
||||||
|
|
||||||
|
System.out.println("A MQTT control packet was intercepted " + mqttMessage.fixedHeader().messageType());
|
||||||
|
|
||||||
|
// If you need to handle an specific packet type:
|
||||||
if (mqttMessage instanceof MqttPublishMessage) {
|
if (mqttMessage instanceof MqttPublishMessage) {
|
||||||
MqttPublishMessage message = (MqttPublishMessage) mqttMessage;
|
MqttPublishMessage message = (MqttPublishMessage) mqttMessage;
|
||||||
|
|
||||||
|
@ -49,6 +52,12 @@ public class SimpleMQTTInterceptor implements MQTTInterceptor {
|
||||||
|
|
||||||
message.payload().setBytes(0, modifiedMessage.getBytes());
|
message.payload().setBytes(0, modifiedMessage.getBytes());
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
if (mqttMessage instanceof MqttConnectMessage) {
|
||||||
|
MqttConnectMessage connectMessage = (MqttConnectMessage) mqttMessage;
|
||||||
|
System.out.println("A MQTT CONNECT control packet was intercepted " + connectMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// We return true which means "call next interceptor" (if there is one) or target.
|
// We return true which means "call next interceptor" (if there is one) or target.
|
||||||
|
|
|
@ -39,6 +39,10 @@ under the License.
|
||||||
<class-name>org.apache.activemq.artemis.mqtt.example.SimpleMQTTInterceptor</class-name>
|
<class-name>org.apache.activemq.artemis.mqtt.example.SimpleMQTTInterceptor</class-name>
|
||||||
</remoting-incoming-interceptors>
|
</remoting-incoming-interceptors>
|
||||||
|
|
||||||
|
<remoting-outgoing-interceptors>
|
||||||
|
<class-name>org.apache.activemq.artemis.mqtt.example.SimpleMQTTInterceptor</class-name>
|
||||||
|
</remoting-outgoing-interceptors>
|
||||||
|
|
||||||
<bindings-directory>./data/bindings</bindings-directory>
|
<bindings-directory>./data/bindings</bindings-directory>
|
||||||
|
|
||||||
<journal-directory>./data/journal</journal-directory>
|
<journal-directory>./data/journal</journal-directory>
|
||||||
|
|
Loading…
Reference in New Issue