This commit is contained in:
Clebert Suconic 2017-08-22 22:08:24 -04:00
commit ce01faeac7
5 changed files with 38 additions and 113 deletions

View File

@ -17,8 +17,6 @@
package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.Map;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@ -40,14 +38,12 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.util.ReferenceCountUtil;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
/**
* This class is responsible for receiving and sending MQTT packets, delegating behaviour to one of the
* MQTTConnectionManager, MQTTPublishMananger, MQTTSubscriptionManager classes.
* MQTTConnectionManager, MQTTPublishManager, MQTTSubscriptionManager classes.
*/
public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
@ -68,12 +64,9 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
private boolean stopped = false;
private Map<SimpleString, RoutingType> prefixes;
public MQTTProtocolHandler(ActiveMQServer server, MQTTProtocolManager protocolManager) {
this.server = server;
this.protocolManager = protocolManager;
this.prefixes = protocolManager.getPrefixes();
}
void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception {
@ -82,7 +75,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
this.session = new MQTTSession(this, connection, protocolManager, server.getConfiguration().getWildcardConfiguration());
}
void stop(boolean error) {
void stop() {
stopped = true;
}
@ -107,13 +100,12 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
MQTTUtil.logMessage(session.getState(), message, true);
this.protocolManager.invokeIncoming(message, this.connection);
switch (message.fixedHeader().messageType()) {
case CONNECT:
handleConnect((MqttConnectMessage) message, ctx);
break;
case CONNACK:
handleConnack((MqttConnAckMessage) message);
break;
case PUBLISH:
handlePublish((MqttPublishMessage) message);
break;
@ -130,26 +122,21 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
handlePubcomp(message);
break;
case SUBSCRIBE:
handleSubscribe((MqttSubscribeMessage) message, ctx);
break;
case SUBACK:
handleSuback((MqttSubAckMessage) message);
handleSubscribe((MqttSubscribeMessage) message);
break;
case UNSUBSCRIBE:
handleUnsubscribe((MqttUnsubscribeMessage) message);
break;
case UNSUBACK:
handleUnsuback((MqttUnsubAckMessage) message);
break;
case PINGREQ:
handlePingreq(message, ctx);
break;
case PINGRESP:
handlePingresp(message);
handlePingreq();
break;
case DISCONNECT:
handleDisconnect(message);
disconnect(false);
break;
case UNSUBACK:
case SUBACK:
case PINGRESP:
case CONNACK: // The server does not instantiate connections therefore any CONNACK received over a connection is an invalid control message.
default:
disconnect(true);
}
@ -172,8 +159,6 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
String clientId = connect.payload().clientIdentifier();
session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().passwordInBytes(), connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession());
this.protocolManager.invokeIncoming(connect, this.connection);
}
void disconnect(boolean error) {
@ -184,27 +169,11 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, true);
MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader);
this.protocolManager.invokeOutgoing(message, this.connection);
ctx.write(message);
ctx.flush();
}
/**
* The server does not instantiate connections therefore any CONNACK received over a connection is an invalid
* control message.
*
* @param message
*/
void handleConnack(MqttConnAckMessage message) {
log.debug("Received invalid CONNACK from client: " + session.getSessionState().getClientId());
log.debug("Disconnecting client: " + session.getSessionState().getClientId());
disconnect(true);
sendToClient(message);
}
void handlePublish(MqttPublishMessage message) throws Exception {
this.protocolManager.invokeIncoming(message, this.connection);
session.getMqttPublishManager().handleMessage(message.variableHeader().messageId(), message.variableHeader().topicName(), message.fixedHeader().qosLevel().value(), message.payload(), message.fixedHeader().isRetain());
session.getMqttPublishManager().handleMessage(message.variableHeader().packetId(), message.variableHeader().topicName(), message.fixedHeader().qosLevel().value(), message.payload(), message.fixedHeader().isRetain());
}
void sendPubAck(int messageId) {
@ -228,107 +197,63 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, qos, // Spec requires 01 in header for rel
false, 0);
MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(messageId));
this.protocolManager.invokeOutgoing(rel, this.connection);
ctx.write(rel);
ctx.flush();
sendToClient(rel);
}
void handlePuback(MqttPubAckMessage message) throws Exception {
this.protocolManager.invokeIncoming(message, this.connection);
session.getMqttPublishManager().handlePubAck(message.variableHeader().messageId());
session.getMqttPublishManager().handlePubAck(getMessageId(message));
}
void handlePubrec(MqttMessage message) throws Exception {
this.protocolManager.invokeIncoming(message, this.connection);
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
session.getMqttPublishManager().handlePubRec(messageId);
session.getMqttPublishManager().handlePubRec(getMessageId(message));
}
void handlePubrel(MqttMessage message) {
this.protocolManager.invokeIncoming(message, this.connection);
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
session.getMqttPublishManager().handlePubRel(messageId);
session.getMqttPublishManager().handlePubRel(getMessageId(message));
}
void handlePubcomp(MqttMessage message) throws Exception {
this.protocolManager.invokeIncoming(message, this.connection);
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
session.getMqttPublishManager().handlePubComp(messageId);
session.getMqttPublishManager().handlePubComp(getMessageId(message));
}
void handleSubscribe(MqttSubscribeMessage message, ChannelHandlerContext ctx) throws Exception {
this.protocolManager.invokeIncoming(message, this.connection);
void handleSubscribe(MqttSubscribeMessage message) throws Exception {
MQTTSubscriptionManager subscriptionManager = session.getSubscriptionManager();
int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions());
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttSubAckMessage ack = new MqttSubAckMessage(header, message.variableHeader(), new MqttSubAckPayload(qos));
MQTTUtil.logMessage(session.getSessionState(), ack, false);
ctx.write(ack);
ctx.flush();
}
void handleSuback(MqttSubAckMessage message) {
disconnect(true);
sendToClient(ack);
}
void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception {
this.protocolManager.invokeIncoming(message, this.connection);
session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader());
MQTTUtil.logMessage(session.getSessionState(), m, false);
ctx.write(m);
ctx.flush();
sendToClient(m);
}
void handleUnsuback(MqttUnsubAckMessage message) {
this.protocolManager.invokeOutgoing(message, this.connection);
disconnect(true);
}
void handlePingreq(MqttMessage message, ChannelHandlerContext ctx) {
this.protocolManager.invokeIncoming(message, this.connection);
void handlePingreq() {
MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0));
MQTTUtil.logMessage(session.getSessionState(), pingResp, false);
ctx.write(pingResp);
ctx.flush();
sendToClient(pingResp);
}
void handlePingresp(MqttMessage message) {
disconnect(true);
}
void handleDisconnect(MqttMessage message) {
this.protocolManager.invokeIncoming(message, this.connection);
disconnect(false);
}
protected int send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf payload, int deliveryCount) {
protected void send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf payload, int deliveryCount) {
boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0);
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
sendToClient(publish);
}
this.protocolManager.invokeOutgoing(publish, connection);
MQTTUtil.logMessage(session.getSessionState(), publish, false);
ctx.write(publish);
private void sendToClient(MqttMessage message) {
MQTTUtil.logMessage(session.getSessionState(), message, false);
this.protocolManager.invokeOutgoing(message, connection);
ctx.write(message);
ctx.flush();
}
return 1;
private int getMessageId(MqttMessage message) {
return ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
}
ActiveMQServer getServer() {

View File

@ -90,7 +90,7 @@ public class MQTTSession {
// TODO ensure resources are cleaned up for GC.
synchronized void stop() throws Exception {
if (!stopped) {
protocolHandler.stop(false);
protocolHandler.stop();
subscriptionManager.stop();
mqttPublishManager.stop();

View File

@ -54,7 +54,7 @@ public class InterceptorExample {
// Receive the sent message
Message message1 = connection.receive(5, TimeUnit.SECONDS);
String messagePayload = new String(message1.getPayload(), StandardCharsets.UTF_8);
System.out.println("Received message: " + messagePayload);

View File

@ -51,8 +51,7 @@ public class SimpleMQTTInterceptor implements MQTTInterceptor {
String modifiedMessage = "Modified message ";
message.payload().setBytes(0, modifiedMessage.getBytes());
}
else {
} else {
if (mqttMessage instanceof MqttConnectMessage) {
MqttConnectMessage connectMessage = (MqttConnectMessage) mqttMessage;
System.out.println("A MQTT CONNECT control packet was intercepted " + connectMessage);

View File

@ -99,7 +99,8 @@ public class MQTTTest extends MQTTTestSupport {
mqtt.setVersion(version);
connection = mqtt.blockingConnection();
connection.connect();
assertTrue(connection.isConnected());
BlockingConnection finalConnection = connection;
assertTrue("Should be connected", Wait.waitFor(() -> finalConnection.isConnected(), 5000, 100));
} finally {
if (connection != null && connection.isConnected()) connection.disconnect();
}