From 20c67baa5af1391e8b58257190e25f59a5d9717a Mon Sep 17 00:00:00 2001 From: Otavio Rodolfo Piske Date: Mon, 20 Mar 2017 16:55:55 +0100 Subject: [PATCH] ARTEMIS-607 Added an example/verification for calling an interceptor when using MQTT protocol --- .../standard/interceptor-client-mqtt/pom.xml | 120 ++++++++++++ .../interceptor-client-mqtt/readme.html | 71 +++++++ .../mqtt/example/InterceptorExample.java | 62 ++++++ .../mqtt/example/SimpleMQTTInterceptor.java | 60 ++++++ .../resources/activemq/server0/broker.xml | 176 ++++++++++++++++++ examples/features/standard/pom.xml | 2 + 6 files changed, 491 insertions(+) create mode 100644 examples/features/standard/interceptor-client-mqtt/pom.xml create mode 100644 examples/features/standard/interceptor-client-mqtt/readme.html create mode 100644 examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java create mode 100644 examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java create mode 100644 examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml diff --git a/examples/features/standard/interceptor-client-mqtt/pom.xml b/examples/features/standard/interceptor-client-mqtt/pom.xml new file mode 100644 index 0000000000..2e92dfc0c1 --- /dev/null +++ b/examples/features/standard/interceptor-client-mqtt/pom.xml @@ -0,0 +1,120 @@ + + + + + 4.0.0 + + + org.apache.activemq.examples.broker + jms-examples + 2.1.0-SNAPSHOT + + + interceptor-client-mqtt + jar + ActiveMQ Artemis MQTT Interceptor Example + + + ${project.basedir}/../../../.. + + + + + org.apache.activemq + artemis-mqtt-protocol + ${project.version} + + + + org.fusesource.mqtt-client + mqtt-client + + + + io.netty + netty-all + + + + + + + org.apache.activemq + artemis-maven-plugin + + + create + + create + + + org.apache.activemq.examples.broker:interceptor-client-mqtt:${project.version} + ${noServer} + ${basedir}/target/classes/activemq/server0 + + + + start + + cli + + + ${noServer} + true + tcp://localhost:61616 + + run + + + + + runClient + + runClient + + + org.apache.activemq.artemis.mqtt.example.InterceptorExample + + + + stop + + cli + + + ${noServer} + + stop + + + + + + + org.apache.activemq.examples.broker + interceptor-client-mqtt + ${project.version} + + + + + + + diff --git a/examples/features/standard/interceptor-client-mqtt/readme.html b/examples/features/standard/interceptor-client-mqtt/readme.html new file mode 100644 index 0000000000..7c49ca6eff --- /dev/null +++ b/examples/features/standard/interceptor-client-mqtt/readme.html @@ -0,0 +1,71 @@ + + + + + ActiveMQ Artemis JMS Interceptor Example + + + + + +

MQTT Interceptor Example

+ +
To run the example, simply type mvn verify from this directory, 
or mvn -PnoServer verify if you want to start and create the server manually.
+ + +

This example shows you how to implement and configure a simple incoming, server-side MQTT interceptor with ActiveMQ Artemis.

+ +

ActiveMQ Artemis allows an application to use an interceptor to hook into the messaging system. All that needs to do is to implement the + Interceptor interface, as defined below:

+
+     
+         public interface Interceptor
+         {
+            boolean intercept(final MqttMessage mqttMessage, RemotingConnection connection);
+         }
+     
+     
+

Once you have your own interceptor class, add it to the broker.xml, as follows:

+
+     
+        <configuration>
+        ...
+           <remoting-incoming-interceptors>
+              <class-name>org.apache.activemq.artemis.mqtt.example.SimpleMQTTInterceptor</class-name>
+           </remoting-incoming-interceptors>
+        ...
+        </configuration>
+     
+     
+ +

With interceptor, you can handle various events in message processing. In this example, a simple interceptor, SimpleMQTTInterceptor, is implemented and configured. + When the example is running, the interceptor will modify the payload of a sample MQTT message.

+ +

With our interceptor we always return true from the intercept method. If we were + to return false that signifies that no more interceptors are to run or the target + is not to be called. Return false to abort processing of the packet.

+ + + + + + + + diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java new file mode 100644 index 0000000000..5926553031 --- /dev/null +++ b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.mqtt.example; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.Message; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; + +/** + * A simple example that shows how to implement and use interceptors with ActiveMQ Artemis with the MQTT protocol. + */ +public class InterceptorExample { + public static void main(final String[] args) throws Exception { + + System.out.println("Connecting to Artemis using MQTT"); + MQTT mqtt = new MQTT(); + mqtt.setHost("tcp://localhost:1883"); + + BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + System.out.println("Connected to Artemis"); + + // Subscribe to a topic + Topic[] topics = {new Topic("mqtt/example/interceptor", QoS.AT_LEAST_ONCE)}; + connection.subscribe(topics); + System.out.println("Subscribed to topics."); + + // Publish message + String payload1 = "This is message 1"; + + connection.publish("mqtt/example/interceptor", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + + System.out.println("Sent message"); + + // Receive the sent message + Message message1 = connection.receive(5, TimeUnit.SECONDS); + + String messagePayload = new String(message1.getPayload(), StandardCharsets.UTF_8); + + System.out.println("Received message: " + messagePayload); + } +} diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java new file mode 100644 index 0000000000..677328c73b --- /dev/null +++ b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.mqtt.example; + +import java.nio.charset.Charset; + +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor; + + +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; + +import io.netty.handler.codec.mqtt.MqttMessage; + + +/** + * A simple Interceptor implementation + */ +public class SimpleMQTTInterceptor implements MQTTInterceptor { + + @Override + public boolean intercept(final MqttMessage mqttMessage, RemotingConnection connection) { + System.out.println("MQTT Interceptor gets called "); + + + if (mqttMessage instanceof MqttPublishMessage) { + MqttPublishMessage message = (MqttPublishMessage) mqttMessage; + + + String originalMessage = message.payload().toString(Charset.forName("UTF-8")); + System.out.println("Original message: " + originalMessage); + + // The new message content must not be bigger that the original content. + String modifiedMessage = "Modified message "; + + message.payload().setBytes(0, modifiedMessage.getBytes()); + } + + + // We return true which means "call next interceptor" (if there is one) or target. + // If we returned false, it means "abort call" - no more interceptors would be called and neither would + // the target + return true; + } + +} diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml b/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml new file mode 100644 index 0000000000..f93a4045e8 --- /dev/null +++ b/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml @@ -0,0 +1,176 @@ + + + + + + + + 0.0.0.0 + + true + + + ASYNCIO + + ./data/paging + + + org.apache.activemq.artemis.mqtt.example.SimpleMQTTInterceptor + + + ./data/bindings + + ./data/journal + + ./data/large-messages + + true + + 2 + + -1 + + + + + + + + + + + + + + + + + + + + + + 5000 + + + 90 + + + 100Mb + + + + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE + + + + tcp://0.0.0.0:5672?protocols=AMQP + + + tcp://0.0.0.0:61613?protocols=STOMP + + + tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP + + + tcp://0.0.0.0:1883?protocols=MQTT + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + true + true + + + + +
+ + + +
+
+ + + +
+ +
+ +
+
diff --git a/examples/features/standard/pom.xml b/examples/features/standard/pom.xml index 1fe7123df7..4adb26bbb5 100644 --- a/examples/features/standard/pom.xml +++ b/examples/features/standard/pom.xml @@ -56,6 +56,7 @@ under the License. http-transport interceptor interceptor-client + interceptor-client-mqtt instantiate-connection-factory jms-auto-closeable jms-bridge @@ -118,6 +119,7 @@ under the License. http-transport interceptor interceptor-client + interceptor-client-mqtt jms-auto-closeable instantiate-connection-factory jms-bridge