To run the example, simply type mvn verify from this directory, or mvn -PnoServer verify if you want to start and create the server manually.
+
+
+
This example shows you how to implement and configure a simple incoming, server-side MQTT interceptor with ActiveMQ Artemis.
+
+
ActiveMQ Artemis allows an application to use an interceptor to hook into the messaging system. All that needs to do is to implement the
+ Interceptor interface, as defined below:
With interceptor, you can handle various events in message processing. In this example, a simple interceptor, SimpleMQTTInterceptor, is implemented and configured.
+ When the example is running, the interceptor will modify the payload of a sample MQTT message.
+
+
With our interceptor we always return true from the intercept method. If we were
+ to return false that signifies that no more interceptors are to run or the target
+ is not to be called. Return false to abort processing of the packet.
+
+
+
+
+
+
+
+
diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java
new file mode 100644
index 0000000000..5926553031
--- /dev/null
+++ b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.mqtt.example;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+
+/**
+ * A simple example that shows how to implement and use interceptors with ActiveMQ Artemis with the MQTT protocol.
+ */
+public class InterceptorExample {
+ public static void main(final String[] args) throws Exception {
+
+ System.out.println("Connecting to Artemis using MQTT");
+ MQTT mqtt = new MQTT();
+ mqtt.setHost("tcp://localhost:1883");
+
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ System.out.println("Connected to Artemis");
+
+ // Subscribe to a topic
+ Topic[] topics = {new Topic("mqtt/example/interceptor", QoS.AT_LEAST_ONCE)};
+ connection.subscribe(topics);
+ System.out.println("Subscribed to topics.");
+
+ // Publish message
+ String payload1 = "This is message 1";
+
+ connection.publish("mqtt/example/interceptor", payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
+
+ System.out.println("Sent message");
+
+ // Receive the sent message
+ Message message1 = connection.receive(5, TimeUnit.SECONDS);
+
+ String messagePayload = new String(message1.getPayload(), StandardCharsets.UTF_8);
+
+ System.out.println("Received message: " + messagePayload);
+ }
+}
diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java
new file mode 100644
index 0000000000..677328c73b
--- /dev/null
+++ b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.mqtt.example;
+
+import java.nio.charset.Charset;
+
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
+
+
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+
+import io.netty.handler.codec.mqtt.MqttMessage;
+
+
+/**
+ * A simple Interceptor implementation
+ */
+public class SimpleMQTTInterceptor implements MQTTInterceptor {
+
+ @Override
+ public boolean intercept(final MqttMessage mqttMessage, RemotingConnection connection) {
+ System.out.println("MQTT Interceptor gets called ");
+
+
+ if (mqttMessage instanceof MqttPublishMessage) {
+ MqttPublishMessage message = (MqttPublishMessage) mqttMessage;
+
+
+ String originalMessage = message.payload().toString(Charset.forName("UTF-8"));
+ System.out.println("Original message: " + originalMessage);
+
+ // The new message content must not be bigger that the original content.
+ String modifiedMessage = "Modified message ";
+
+ message.payload().setBytes(0, modifiedMessage.getBytes());
+ }
+
+
+ // We return true which means "call next interceptor" (if there is one) or target.
+ // If we returned false, it means "abort call" - no more interceptors would be called and neither would
+ // the target
+ return true;
+ }
+
+}
diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml b/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml
new file mode 100644
index 0000000000..f93a4045e8
--- /dev/null
+++ b/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml
@@ -0,0 +1,176 @@
+
+
+
+
+
+
+
+ 0.0.0.0
+
+ true
+
+
+ ASYNCIO
+
+ ./data/paging
+
+
+ org.apache.activemq.artemis.mqtt.example.SimpleMQTTInterceptor
+
+
+ ./data/bindings
+
+ ./data/journal
+
+ ./data/large-messages
+
+ true
+
+ 2
+
+ -1
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 5000
+
+
+ 90
+
+
+ 100Mb
+
+
+
+ tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE
+
+
+
+ tcp://0.0.0.0:5672?protocols=AMQP
+
+
+ tcp://0.0.0.0:61613?protocols=STOMP
+
+
+ tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP
+
+
+ tcp://0.0.0.0:1883?protocols=MQTT
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ DLQ
+ ExpiryQueue
+ 0
+
+ -1
+ 10
+ PAGE
+ true
+ true
+ true
+ true
+
+
+
+ DLQ
+ ExpiryQueue
+ 0
+
+ -1
+ 10
+ PAGE
+ true
+ true
+ true
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/examples/features/standard/pom.xml b/examples/features/standard/pom.xml
index 1fe7123df7..4adb26bbb5 100644
--- a/examples/features/standard/pom.xml
+++ b/examples/features/standard/pom.xml
@@ -56,6 +56,7 @@ under the License.
http-transportinterceptorinterceptor-client
+ interceptor-client-mqttinstantiate-connection-factoryjms-auto-closeablejms-bridge
@@ -118,6 +119,7 @@ under the License.
http-transportinterceptorinterceptor-client
+ interceptor-client-mqttjms-auto-closeableinstantiate-connection-factoryjms-bridge