diff --git a/artemis-distribution/src/test/scripts/run-examples.sh b/artemis-distribution/src/test/scripts/run-examples.sh
index e5f5f0065c..71bd398249 100755
--- a/artemis-distribution/src/test/scripts/run-examples.sh
+++ b/artemis-distribution/src/test/scripts/run-examples.sh
@@ -33,6 +33,7 @@ cd $ARTEMIS_HOME/examples/features/standard/
cd bridge; mvn verify; cd ..
cd bridge; mvn verify; cd ..
cd browser; mvn verify; cd ..
+cd broker-plugin; mvn verify; cd ..
cd cdi; mvn verify; cd ..
cd client-kickoff; mvn verify; cd ..
cd consumer-rate-limit; mvn verify; cd ..
diff --git a/artemis-distribution/src/test/scripts/run-standard-examples.sh b/artemis-distribution/src/test/scripts/run-standard-examples.sh
index 86705a7c5e..677d029d9b 100755
--- a/artemis-distribution/src/test/scripts/run-standard-examples.sh
+++ b/artemis-distribution/src/test/scripts/run-standard-examples.sh
@@ -33,6 +33,7 @@ cd $ARTEMIS_HOME/examples/features/standard/
cd bridge; mvn verify; cd ..
cd bridge; mvn verify; cd ..
cd browser; mvn verify; cd ..
+cd broker-plugin; mvn verify; cd ..
cd client-kickoff; mvn verify; cd ..
cd consumer-rate-limit; mvn verify; cd ..
cd dead-letter; mvn verify; cd ..
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index eb8edb53f5..d8b67ea2e3 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -18,13 +18,15 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
@@ -57,10 +59,6 @@ import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
public class AMQPMessage extends RefCountMessage {
@@ -164,10 +162,11 @@ public class AMQPMessage extends RefCountMessage {
}
if (map == null) {
- return Collections.emptyMap();
- } else {
- return map;
+ map = new HashMap<>();
+ this.applicationProperties = new ApplicationProperties(map);
}
+
+ return map;
}
private ApplicationProperties getApplicationProperties() {
diff --git a/docs/user-manual/en/using-AMQP.md b/docs/user-manual/en/using-AMQP.md
index 51967f1b3e..22c0cfbfb8 100644
--- a/docs/user-manual/en/using-AMQP.md
+++ b/docs/user-manual/en/using-AMQP.md
@@ -52,6 +52,23 @@ We have a few examples as part of the Artemis distribution:
- Java (Using the qpid JMS Client)
* ./examples/protocols/amqp/queue
+
+- Interceptors
+ * ./examples/features/standard/broker-plugin
+
+
+ # Intercepting and changing messages
+
+ We don't recommend changing messages at the server's side for a few reasons:
+
+ - AMQPMessages are meant to be immutable
+ - The message won't be the original message the user sent
+ - AMQP has the possibility of signing messages. The signature would be broken.
+ - For performance reasons. We try not to re-encode (or even decode) messages.
+
+If regardless these recommendations you still need and want to intercept and change AMQP Messages, look at the example under ./examples/features/standard/broker-plugin.
+
+This example will send AMQP Message and modify properties before they reach the journals and are sent to the consumers.
diff --git a/examples/features/standard/broker-plugin/pom.xml b/examples/features/standard/broker-plugin/pom.xml
new file mode 100644
index 0000000000..b3be8b7f0b
--- /dev/null
+++ b/examples/features/standard/broker-plugin/pom.xml
@@ -0,0 +1,126 @@
+
+
+
+
To run the example, simply type mvn verify from this directory,+ +
or mvn -PnoServer verify if you want to start and create the server manually.
AMQP Messages should be by definition immutable at the server's. So, we don't recommend changing message contents. + However if you require you can use this example as a basis for adding properties on making changes
+ + diff --git a/examples/features/standard/broker-plugin/src/main/java/org/apache/activemq/artemis/jms/example/BrokerPlugin.java b/examples/features/standard/broker-plugin/src/main/java/org/apache/activemq/artemis/jms/example/BrokerPlugin.java new file mode 100644 index 0000000000..bd816a640c --- /dev/null +++ b/examples/features/standard/broker-plugin/src/main/java/org/apache/activemq/artemis/jms/example/BrokerPlugin.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.jms.example; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; +import org.apache.activemq.artemis.core.transaction.Transaction; + +public class BrokerPlugin implements ActiveMQServerPlugin { + + int count = 0; + + @Override + public void beforeSend(ServerSession session, + Transaction tx, + Message message, + boolean direct, + boolean noAutoCreateQueue) throws ActiveMQException { + message.putStringProperty("count", "" + (count++)); + // if you don't reencode, the message changes won't reach the clients or the journal + // (even if you don't need the properties sent to the client, + // a reload of messages from journal won't have these changes in) + message.reencode(); + } + +} diff --git a/examples/features/standard/broker-plugin/src/main/java/org/apache/activemq/artemis/jms/example/BrokerPluginExample.java b/examples/features/standard/broker-plugin/src/main/java/org/apache/activemq/artemis/jms/example/BrokerPluginExample.java new file mode 100644 index 0000000000..d91846a310 --- /dev/null +++ b/examples/features/standard/broker-plugin/src/main/java/org/apache/activemq/artemis/jms/example/BrokerPluginExample.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jms.example; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.client.ActiveMQQueue; +import org.apache.qpid.jms.JmsConnectionFactory; + +/** + * A simple example which shows how to use a QueueBrowser to look at messages of a queue without removing them from the queue + */ +public class BrokerPluginExample { + + public static void main(final String[] args) throws Exception { + + // This example will send and receive an AMQP message + sendConsumeAMQP(); + + // And it will also send and receive a Core message + sendConsumeCore(); + } + + private static void sendConsumeAMQP() throws JMSException { + Connection connection = null; + ConnectionFactory connectionFactory = new JmsConnectionFactory("amqp://localhost:5672"); + + try { + + // Create an amqp qpid 1.0 connection + connection = connectionFactory.createConnection(); + + // Create a session + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create a sender + Queue queue = session.createQueue("exampleQueue"); + MessageProducer sender = session.createProducer(queue); + + // send a few simple message + sender.send(session.createTextMessage("Hello world ")); + + connection.start(); + + // create a moving receiver, this means the message will be removed from the queue + MessageConsumer consumer = session.createConsumer(queue); + + // receive the simple message + TextMessage m = (TextMessage) consumer.receive(5000); + + if (m.getStringProperty("count") == null) { + throw new RuntimeException(("missed property count")); + } + + System.out.println("message = " + m.getText() + " property count (added by interceptor = " + m.getStringProperty("count") + ")"); + + } finally { + if (connection != null) { + // close the connection + connection.close(); + } + } + } + + + private static void sendConsumeCore() throws JMSException { + Connection connection = null; + try { + // Perform a lookup on the Connection Factory + ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); + + Queue queue = new ActiveMQQueue("exampleQueue"); + + // Create a JMS Connection + connection = cf.createConnection(); + + // Create a JMS Session + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create a JMS Message Producer + MessageProducer producer = session.createProducer(queue); + + // Create a Text Message + TextMessage message = session.createTextMessage("This is a text message"); + + // Send the Message + producer.send(message); + + // Create a JMS Message Consumer + MessageConsumer messageConsumer = session.createConsumer(queue); + + // Start the Connection + connection.start(); + + // Receive the message + TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000); + + if (messageReceived.getStringProperty("count") == null) { + throw new RuntimeException(("missed property count")); + } + + System.out.println("message = " + messageReceived.getText() + " property count (added by interceptor = " + messageReceived.getStringProperty("count") + ")"); + + } finally { + if (connection != null) { + connection.close(); + } + } + + } +} diff --git a/examples/features/standard/broker-plugin/src/main/resources/activemq/server0/broker.xml b/examples/features/standard/broker-plugin/src/main/resources/activemq/server0/broker.xml new file mode 100644 index 0000000000..4ac665c1f5 --- /dev/null +++ b/examples/features/standard/broker-plugin/src/main/resources/activemq/server0/broker.xml @@ -0,0 +1,199 @@ + + + +