ARTEMIS-1358 refactor MQTTProtocolManager a bit

This commit is contained in:
Justin Bertram 2017-08-21 09:46:27 -05:00 committed by Clebert Suconic
parent e82f611e2e
commit a965b6d2ee
2 changed files with 34 additions and 109 deletions

View File

@ -17,8 +17,6 @@
package org.apache.activemq.artemis.core.protocol.mqtt; package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.Map;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
@ -40,14 +38,12 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
/** /**
* This class is responsible for receiving and sending MQTT packets, delegating behaviour to one of the * This class is responsible for receiving and sending MQTT packets, delegating behaviour to one of the
* MQTTConnectionManager, MQTTPublishMananger, MQTTSubscriptionManager classes. * MQTTConnectionManager, MQTTPublishManager, MQTTSubscriptionManager classes.
*/ */
public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
@ -68,12 +64,9 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
private boolean stopped = false; private boolean stopped = false;
private Map<SimpleString, RoutingType> prefixes;
public MQTTProtocolHandler(ActiveMQServer server, MQTTProtocolManager protocolManager) { public MQTTProtocolHandler(ActiveMQServer server, MQTTProtocolManager protocolManager) {
this.server = server; this.server = server;
this.protocolManager = protocolManager; this.protocolManager = protocolManager;
this.prefixes = protocolManager.getPrefixes();
} }
void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception { void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception {
@ -82,7 +75,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
this.session = new MQTTSession(this, connection, protocolManager, server.getConfiguration().getWildcardConfiguration()); this.session = new MQTTSession(this, connection, protocolManager, server.getConfiguration().getWildcardConfiguration());
} }
void stop(boolean error) { void stop() {
stopped = true; stopped = true;
} }
@ -107,13 +100,12 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
MQTTUtil.logMessage(session.getState(), message, true); MQTTUtil.logMessage(session.getState(), message, true);
this.protocolManager.invokeIncoming(message, this.connection);
switch (message.fixedHeader().messageType()) { switch (message.fixedHeader().messageType()) {
case CONNECT: case CONNECT:
handleConnect((MqttConnectMessage) message, ctx); handleConnect((MqttConnectMessage) message, ctx);
break; break;
case CONNACK:
handleConnack((MqttConnAckMessage) message);
break;
case PUBLISH: case PUBLISH:
handlePublish((MqttPublishMessage) message); handlePublish((MqttPublishMessage) message);
break; break;
@ -130,26 +122,21 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
handlePubcomp(message); handlePubcomp(message);
break; break;
case SUBSCRIBE: case SUBSCRIBE:
handleSubscribe((MqttSubscribeMessage) message, ctx); handleSubscribe((MqttSubscribeMessage) message);
break;
case SUBACK:
handleSuback((MqttSubAckMessage) message);
break; break;
case UNSUBSCRIBE: case UNSUBSCRIBE:
handleUnsubscribe((MqttUnsubscribeMessage) message); handleUnsubscribe((MqttUnsubscribeMessage) message);
break; break;
case UNSUBACK:
handleUnsuback((MqttUnsubAckMessage) message);
break;
case PINGREQ: case PINGREQ:
handlePingreq(message, ctx); handlePingreq();
break;
case PINGRESP:
handlePingresp(message);
break; break;
case DISCONNECT: case DISCONNECT:
handleDisconnect(message); disconnect(false);
break; break;
case UNSUBACK:
case SUBACK:
case PINGRESP:
case CONNACK: // The server does not instantiate connections therefore any CONNACK received over a connection is an invalid control message.
default: default:
disconnect(true); disconnect(true);
} }
@ -172,8 +159,6 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
String clientId = connect.payload().clientIdentifier(); String clientId = connect.payload().clientIdentifier();
session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().passwordInBytes(), connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession()); session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().passwordInBytes(), connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession());
this.protocolManager.invokeIncoming(connect, this.connection);
} }
void disconnect(boolean error) { void disconnect(boolean error) {
@ -184,27 +169,11 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, true); MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, true);
MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader); MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader);
sendToClient(message);
this.protocolManager.invokeOutgoing(message, this.connection);
ctx.write(message);
ctx.flush();
}
/**
* The server does not instantiate connections therefore any CONNACK received over a connection is an invalid
* control message.
*
* @param message
*/
void handleConnack(MqttConnAckMessage message) {
log.debug("Received invalid CONNACK from client: " + session.getSessionState().getClientId());
log.debug("Disconnecting client: " + session.getSessionState().getClientId());
disconnect(true);
} }
void handlePublish(MqttPublishMessage message) throws Exception { void handlePublish(MqttPublishMessage message) throws Exception {
this.protocolManager.invokeIncoming(message, this.connection); session.getMqttPublishManager().handleMessage(message.variableHeader().packetId(), message.variableHeader().topicName(), message.fixedHeader().qosLevel().value(), message.payload(), message.fixedHeader().isRetain());
session.getMqttPublishManager().handleMessage(message.variableHeader().messageId(), message.variableHeader().topicName(), message.fixedHeader().qosLevel().value(), message.payload(), message.fixedHeader().isRetain());
} }
void sendPubAck(int messageId) { void sendPubAck(int messageId) {
@ -228,107 +197,63 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, qos, // Spec requires 01 in header for rel MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, qos, // Spec requires 01 in header for rel
false, 0); false, 0);
MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(messageId)); MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(messageId));
sendToClient(rel);
this.protocolManager.invokeOutgoing(rel, this.connection);
ctx.write(rel);
ctx.flush();
} }
void handlePuback(MqttPubAckMessage message) throws Exception { void handlePuback(MqttPubAckMessage message) throws Exception {
this.protocolManager.invokeIncoming(message, this.connection); session.getMqttPublishManager().handlePubAck(getMessageId(message));
session.getMqttPublishManager().handlePubAck(message.variableHeader().messageId());
} }
void handlePubrec(MqttMessage message) throws Exception { void handlePubrec(MqttMessage message) throws Exception {
this.protocolManager.invokeIncoming(message, this.connection); session.getMqttPublishManager().handlePubRec(getMessageId(message));
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
session.getMqttPublishManager().handlePubRec(messageId);
} }
void handlePubrel(MqttMessage message) { void handlePubrel(MqttMessage message) {
this.protocolManager.invokeIncoming(message, this.connection); session.getMqttPublishManager().handlePubRel(getMessageId(message));
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
session.getMqttPublishManager().handlePubRel(messageId);
} }
void handlePubcomp(MqttMessage message) throws Exception { void handlePubcomp(MqttMessage message) throws Exception {
this.protocolManager.invokeIncoming(message, this.connection); session.getMqttPublishManager().handlePubComp(getMessageId(message));
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
session.getMqttPublishManager().handlePubComp(messageId);
} }
void handleSubscribe(MqttSubscribeMessage message, ChannelHandlerContext ctx) throws Exception { void handleSubscribe(MqttSubscribeMessage message) throws Exception {
this.protocolManager.invokeIncoming(message, this.connection);
MQTTSubscriptionManager subscriptionManager = session.getSubscriptionManager(); MQTTSubscriptionManager subscriptionManager = session.getSubscriptionManager();
int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions()); int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions());
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttSubAckMessage ack = new MqttSubAckMessage(header, message.variableHeader(), new MqttSubAckPayload(qos)); MqttSubAckMessage ack = new MqttSubAckMessage(header, message.variableHeader(), new MqttSubAckPayload(qos));
MQTTUtil.logMessage(session.getSessionState(), ack, false); sendToClient(ack);
ctx.write(ack);
ctx.flush();
}
void handleSuback(MqttSubAckMessage message) {
disconnect(true);
} }
void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception { void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception {
this.protocolManager.invokeIncoming(message, this.connection);
session.getSubscriptionManager().removeSubscriptions(message.payload().topics()); session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader()); MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader());
MQTTUtil.logMessage(session.getSessionState(), m, false); sendToClient(m);
ctx.write(m);
ctx.flush();
} }
void handleUnsuback(MqttUnsubAckMessage message) { void handlePingreq() {
this.protocolManager.invokeOutgoing(message, this.connection);
disconnect(true);
}
void handlePingreq(MqttMessage message, ChannelHandlerContext ctx) {
this.protocolManager.invokeIncoming(message, this.connection);
MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0)); MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0));
MQTTUtil.logMessage(session.getSessionState(), pingResp, false); sendToClient(pingResp);
ctx.write(pingResp);
ctx.flush();
} }
void handlePingresp(MqttMessage message) { protected void send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf payload, int deliveryCount) {
disconnect(true);
}
void handleDisconnect(MqttMessage message) {
this.protocolManager.invokeIncoming(message, this.connection);
disconnect(false);
}
protected int send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf payload, int deliveryCount) {
boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0); boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0);
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId); MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
MqttMessage publish = new MqttPublishMessage(header, varHeader, payload); MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
sendToClient(publish);
}
this.protocolManager.invokeOutgoing(publish, connection); private void sendToClient(MqttMessage message) {
MQTTUtil.logMessage(session.getSessionState(), message, false);
MQTTUtil.logMessage(session.getSessionState(), publish, false); this.protocolManager.invokeOutgoing(message, connection);
ctx.write(message);
ctx.write(publish);
ctx.flush(); ctx.flush();
}
return 1; private int getMessageId(MqttMessage message) {
return ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
} }
ActiveMQServer getServer() { ActiveMQServer getServer() {

View File

@ -90,7 +90,7 @@ public class MQTTSession {
// TODO ensure resources are cleaned up for GC. // TODO ensure resources are cleaned up for GC.
synchronized void stop() throws Exception { synchronized void stop() throws Exception {
if (!stopped) { if (!stopped) {
protocolHandler.stop(false); protocolHandler.stop();
subscriptionManager.stop(); subscriptionManager.stop();
mqttPublishManager.stop(); mqttPublishManager.stop();