From 3d89a5eb2138c7496332725b43c3f72acf60d09a Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Mon, 21 Aug 2017 09:41:48 -0500 Subject: [PATCH 1/3] NO-JIRA MQTT interceptor example checkstyle --- .../activemq/artemis/mqtt/example/InterceptorExample.java | 2 +- .../activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java index 4fb5abf530..e72c93f794 100644 --- a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java +++ b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java @@ -54,7 +54,7 @@ public class InterceptorExample { // Receive the sent message Message message1 = connection.receive(5, TimeUnit.SECONDS); - + String messagePayload = new String(message1.getPayload(), StandardCharsets.UTF_8); System.out.println("Received message: " + messagePayload); diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java index 1b7b482e22..c705b81865 100644 --- a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java +++ b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java @@ -51,8 +51,7 @@ public class SimpleMQTTInterceptor implements MQTTInterceptor { String modifiedMessage = "Modified message "; message.payload().setBytes(0, modifiedMessage.getBytes()); - } - else { + } else { if (mqttMessage instanceof MqttConnectMessage) { MqttConnectMessage connectMessage = (MqttConnectMessage) mqttMessage; System.out.println("A MQTT CONNECT control packet was intercepted " + connectMessage); From e82f611e2e95e7b0fcd124f36f9c276eb1e4d2f9 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Mon, 21 Aug 2017 09:46:04 -0500 Subject: [PATCH 2/3] NO-JIRA fix intermittently failing MQTT test --- .../artemis/tests/integration/mqtt/imported/MQTTTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 2e67dd694a..794b002e9e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -99,7 +99,8 @@ public class MQTTTest extends MQTTTestSupport { mqtt.setVersion(version); connection = mqtt.blockingConnection(); connection.connect(); - assertTrue(connection.isConnected()); + BlockingConnection finalConnection = connection; + assertTrue("Should be connected", Wait.waitFor(() -> finalConnection.isConnected(), 5000, 100)); } finally { if (connection != null && connection.isConnected()) connection.disconnect(); } From a965b6d2ee40dc87df49ee32a456009a924fdcce Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Mon, 21 Aug 2017 09:46:27 -0500 Subject: [PATCH 3/3] ARTEMIS-1358 refactor MQTTProtocolManager a bit --- .../protocol/mqtt/MQTTProtocolHandler.java | 141 ++++-------------- .../core/protocol/mqtt/MQTTSession.java | 2 +- 2 files changed, 34 insertions(+), 109 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java index 0c0be01f14..e7388e84a6 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java @@ -17,8 +17,6 @@ package org.apache.activemq.artemis.core.protocol.mqtt; -import java.util.Map; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -40,14 +38,12 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; import io.netty.util.ReferenceCountUtil; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; /** * This class is responsible for receiving and sending MQTT packets, delegating behaviour to one of the - * MQTTConnectionManager, MQTTPublishMananger, MQTTSubscriptionManager classes. + * MQTTConnectionManager, MQTTPublishManager, MQTTSubscriptionManager classes. */ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { @@ -68,12 +64,9 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { private boolean stopped = false; - private Map prefixes; - public MQTTProtocolHandler(ActiveMQServer server, MQTTProtocolManager protocolManager) { this.server = server; this.protocolManager = protocolManager; - this.prefixes = protocolManager.getPrefixes(); } void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception { @@ -82,7 +75,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { this.session = new MQTTSession(this, connection, protocolManager, server.getConfiguration().getWildcardConfiguration()); } - void stop(boolean error) { + void stop() { stopped = true; } @@ -107,13 +100,12 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { MQTTUtil.logMessage(session.getState(), message, true); + this.protocolManager.invokeIncoming(message, this.connection); + switch (message.fixedHeader().messageType()) { case CONNECT: handleConnect((MqttConnectMessage) message, ctx); break; - case CONNACK: - handleConnack((MqttConnAckMessage) message); - break; case PUBLISH: handlePublish((MqttPublishMessage) message); break; @@ -130,26 +122,21 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { handlePubcomp(message); break; case SUBSCRIBE: - handleSubscribe((MqttSubscribeMessage) message, ctx); - break; - case SUBACK: - handleSuback((MqttSubAckMessage) message); + handleSubscribe((MqttSubscribeMessage) message); break; case UNSUBSCRIBE: handleUnsubscribe((MqttUnsubscribeMessage) message); break; - case UNSUBACK: - handleUnsuback((MqttUnsubAckMessage) message); - break; case PINGREQ: - handlePingreq(message, ctx); - break; - case PINGRESP: - handlePingresp(message); + handlePingreq(); break; case DISCONNECT: - handleDisconnect(message); + disconnect(false); break; + case UNSUBACK: + case SUBACK: + case PINGRESP: + case CONNACK: // The server does not instantiate connections therefore any CONNACK received over a connection is an invalid control message. default: disconnect(true); } @@ -172,8 +159,6 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { String clientId = connect.payload().clientIdentifier(); session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().passwordInBytes(), connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession()); - - this.protocolManager.invokeIncoming(connect, this.connection); } void disconnect(boolean error) { @@ -184,27 +169,11 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, true); MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader); - - this.protocolManager.invokeOutgoing(message, this.connection); - ctx.write(message); - ctx.flush(); - } - - /** - * The server does not instantiate connections therefore any CONNACK received over a connection is an invalid - * control message. - * - * @param message - */ - void handleConnack(MqttConnAckMessage message) { - log.debug("Received invalid CONNACK from client: " + session.getSessionState().getClientId()); - log.debug("Disconnecting client: " + session.getSessionState().getClientId()); - disconnect(true); + sendToClient(message); } void handlePublish(MqttPublishMessage message) throws Exception { - this.protocolManager.invokeIncoming(message, this.connection); - session.getMqttPublishManager().handleMessage(message.variableHeader().messageId(), message.variableHeader().topicName(), message.fixedHeader().qosLevel().value(), message.payload(), message.fixedHeader().isRetain()); + session.getMqttPublishManager().handleMessage(message.variableHeader().packetId(), message.variableHeader().topicName(), message.fixedHeader().qosLevel().value(), message.payload(), message.fixedHeader().isRetain()); } void sendPubAck(int messageId) { @@ -228,107 +197,63 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, qos, // Spec requires 01 in header for rel false, 0); MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(messageId)); - - this.protocolManager.invokeOutgoing(rel, this.connection); - - ctx.write(rel); - ctx.flush(); + sendToClient(rel); } void handlePuback(MqttPubAckMessage message) throws Exception { - this.protocolManager.invokeIncoming(message, this.connection); - - session.getMqttPublishManager().handlePubAck(message.variableHeader().messageId()); + session.getMqttPublishManager().handlePubAck(getMessageId(message)); } void handlePubrec(MqttMessage message) throws Exception { - this.protocolManager.invokeIncoming(message, this.connection); - - int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); - session.getMqttPublishManager().handlePubRec(messageId); + session.getMqttPublishManager().handlePubRec(getMessageId(message)); } void handlePubrel(MqttMessage message) { - this.protocolManager.invokeIncoming(message, this.connection); - - int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); - session.getMqttPublishManager().handlePubRel(messageId); + session.getMqttPublishManager().handlePubRel(getMessageId(message)); } void handlePubcomp(MqttMessage message) throws Exception { - this.protocolManager.invokeIncoming(message, this.connection); - - int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); - session.getMqttPublishManager().handlePubComp(messageId); + session.getMqttPublishManager().handlePubComp(getMessageId(message)); } - void handleSubscribe(MqttSubscribeMessage message, ChannelHandlerContext ctx) throws Exception { - this.protocolManager.invokeIncoming(message, this.connection); - + void handleSubscribe(MqttSubscribeMessage message) throws Exception { MQTTSubscriptionManager subscriptionManager = session.getSubscriptionManager(); int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions()); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttSubAckMessage ack = new MqttSubAckMessage(header, message.variableHeader(), new MqttSubAckPayload(qos)); - MQTTUtil.logMessage(session.getSessionState(), ack, false); - ctx.write(ack); - ctx.flush(); - } - - void handleSuback(MqttSubAckMessage message) { - disconnect(true); + sendToClient(ack); } void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception { - this.protocolManager.invokeIncoming(message, this.connection); - session.getSubscriptionManager().removeSubscriptions(message.payload().topics()); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader()); - MQTTUtil.logMessage(session.getSessionState(), m, false); - ctx.write(m); - ctx.flush(); + sendToClient(m); } - void handleUnsuback(MqttUnsubAckMessage message) { - this.protocolManager.invokeOutgoing(message, this.connection); - - disconnect(true); - } - - void handlePingreq(MqttMessage message, ChannelHandlerContext ctx) { - this.protocolManager.invokeIncoming(message, this.connection); - + void handlePingreq() { MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0)); - MQTTUtil.logMessage(session.getSessionState(), pingResp, false); - ctx.write(pingResp); - ctx.flush(); + sendToClient(pingResp); } - void handlePingresp(MqttMessage message) { - disconnect(true); - } - - void handleDisconnect(MqttMessage message) { - this.protocolManager.invokeIncoming(message, this.connection); - - disconnect(false); - } - - protected int send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf payload, int deliveryCount) { + protected void send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf payload, int deliveryCount) { boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0); MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId); MqttMessage publish = new MqttPublishMessage(header, varHeader, payload); + sendToClient(publish); + } - this.protocolManager.invokeOutgoing(publish, connection); - - MQTTUtil.logMessage(session.getSessionState(), publish, false); - - ctx.write(publish); + private void sendToClient(MqttMessage message) { + MQTTUtil.logMessage(session.getSessionState(), message, false); + this.protocolManager.invokeOutgoing(message, connection); + ctx.write(message); ctx.flush(); + } - return 1; + private int getMessageId(MqttMessage message) { + return ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); } ActiveMQServer getServer() { diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index c96beba6c0..73dbeaa4b5 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -90,7 +90,7 @@ public class MQTTSession { // TODO ensure resources are cleaned up for GC. synchronized void stop() throws Exception { if (!stopped) { - protocolHandler.stop(false); + protocolHandler.stop(); subscriptionManager.stop(); mqttPublishManager.stop();