ARTEMIS-1461 Allow changing of messages through interceptors

This commit is contained in:
Clebert Suconic 2017-10-11 11:30:22 -04:00
parent 239673b161
commit e31f447032
10 changed files with 563 additions and 8 deletions

View File

@ -33,6 +33,7 @@ cd $ARTEMIS_HOME/examples/features/standard/
cd bridge; mvn verify; cd ..
cd bridge; mvn verify; cd ..
cd browser; mvn verify; cd ..
cd broker-plugin; mvn verify; cd ..
cd cdi; mvn verify; cd ..
cd client-kickoff; mvn verify; cd ..
cd consumer-rate-limit; mvn verify; cd ..

View File

@ -33,6 +33,7 @@ cd $ARTEMIS_HOME/examples/features/standard/
cd bridge; mvn verify; cd ..
cd bridge; mvn verify; cd ..
cd browser; mvn verify; cd ..
cd broker-plugin; mvn verify; cd ..
cd client-kickoff; mvn verify; cd ..
cd consumer-rate-limit; mvn verify; cd ..
cd dead-letter; mvn verify; cd ..

View File

@ -18,13 +18,15 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
@ -57,10 +59,6 @@ import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
public class AMQPMessage extends RefCountMessage {
@ -164,10 +162,11 @@ public class AMQPMessage extends RefCountMessage {
}
if (map == null) {
return Collections.emptyMap();
} else {
return map;
map = new HashMap<>();
this.applicationProperties = new ApplicationProperties(map);
}
return map;
}
private ApplicationProperties getApplicationProperties() {

View File

@ -52,6 +52,23 @@ We have a few examples as part of the Artemis distribution:
- Java (Using the qpid JMS Client)
* ./examples/protocols/amqp/queue
- Interceptors
* ./examples/features/standard/broker-plugin
# Intercepting and changing messages
We don't recommend changing messages at the server's side for a few reasons:
- AMQPMessages are meant to be immutable
- The message won't be the original message the user sent
- AMQP has the possibility of signing messages. The signature would be broken.
- For performance reasons. We try not to re-encode (or even decode) messages.
If regardless these recommendations you still need and want to intercept and change AMQP Messages, look at the example under ./examples/features/standard/broker-plugin.
This example will send AMQP Message and modify properties before they reach the journals and are sent to the consumers.

View File

@ -0,0 +1,126 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq.examples.broker</groupId>
<artifactId>jms-examples</artifactId>
<version>2.4.0-SNAPSHOT</version>
</parent>
<artifactId>broker-plugin</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ Artemis Broker Plugin Example</name>
<properties>
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client-all</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-amqp-protocol</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>${qpid.jms.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-maven-plugin</artifactId>
<executions>
<execution>
<id>create</id>
<phase>verify</phase>
<configuration>
<!-- The broker plugin will install this library on the server's classpath -->
<libList><arg>org.apache.activemq.examples.broker:broker-plugin:${project.version}</arg></libList>
<ignore>${noServer}</ignore>
</configuration>
<goals>
<goal>create</goal>
</goals>
</execution>
<execution>
<id>start</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<spawn>true</spawn>
<ignore>${noServer}</ignore>
<testURI>tcp://localhost:61616</testURI>
<args>
<param>run</param>
</args>
</configuration>
</execution>
<execution>
<id>runClient</id>
<goals>
<goal>runClient</goal>
</goals>
<configuration>
<clientClass>org.apache.activemq.artemis.jms.example.BrokerPluginExample</clientClass>
</configuration>
</execution>
<execution>
<id>stop</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<args>
<param>stop</param>
</args>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.activemq.examples.broker</groupId>
<artifactId>broker-plugin</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,34 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<html>
<head>
<title>ActiveMQ Artemis JMS QueueBrowser Example</title>
<link rel="stylesheet" type="text/css" href="../../../common/common.css" />
<link rel="stylesheet" type="text/css" href="../../../common/prettify.css" />
<script type="text/javascript" src="../../../common/prettify.js"></script>
</head>
<body onload="prettyPrint()">
<h1>Broker Plugin Example</h1>
<pre>To run the example, simply type <b>mvn verify</b> from this directory, <br>or <b>mvn -PnoServer verify</b> if you want to start and create the server manually.</pre>
<p>AMQP Messages should be by definition immutable at the server's. So, we don't recommend changing message contents.
However if you require you can use this example as a basis for adding properties on making changes</p>
</body>
</html>

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.example;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
public class BrokerPlugin implements ActiveMQServerPlugin {
int count = 0;
@Override
public void beforeSend(ServerSession session,
Transaction tx,
Message message,
boolean direct,
boolean noAutoCreateQueue) throws ActiveMQException {
message.putStringProperty("count", "" + (count++));
// if you don't reencode, the message changes won't reach the clients or the journal
// (even if you don't need the properties sent to the client,
// a reload of messages from journal won't have these changes in)
message.reencode();
}
}

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.example;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.apache.qpid.jms.JmsConnectionFactory;
/**
* A simple example which shows how to use a QueueBrowser to look at messages of a queue without removing them from the queue
*/
public class BrokerPluginExample {
public static void main(final String[] args) throws Exception {
// This example will send and receive an AMQP message
sendConsumeAMQP();
// And it will also send and receive a Core message
sendConsumeCore();
}
private static void sendConsumeAMQP() throws JMSException {
Connection connection = null;
ConnectionFactory connectionFactory = new JmsConnectionFactory("amqp://localhost:5672");
try {
// Create an amqp qpid 1.0 connection
connection = connectionFactory.createConnection();
// Create a session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create a sender
Queue queue = session.createQueue("exampleQueue");
MessageProducer sender = session.createProducer(queue);
// send a few simple message
sender.send(session.createTextMessage("Hello world "));
connection.start();
// create a moving receiver, this means the message will be removed from the queue
MessageConsumer consumer = session.createConsumer(queue);
// receive the simple message
TextMessage m = (TextMessage) consumer.receive(5000);
if (m.getStringProperty("count") == null) {
throw new RuntimeException(("missed property count"));
}
System.out.println("message = " + m.getText() + " property count (added by interceptor = " + m.getStringProperty("count") + ")");
} finally {
if (connection != null) {
// close the connection
connection.close();
}
}
}
private static void sendConsumeCore() throws JMSException {
Connection connection = null;
try {
// Perform a lookup on the Connection Factory
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
Queue queue = new ActiveMQQueue("exampleQueue");
// Create a JMS Connection
connection = cf.createConnection();
// Create a JMS Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create a JMS Message Producer
MessageProducer producer = session.createProducer(queue);
// Create a Text Message
TextMessage message = session.createTextMessage("This is a text message");
// Send the Message
producer.send(message);
// Create a JMS Message Consumer
MessageConsumer messageConsumer = session.createConsumer(queue);
// Start the Connection
connection.start();
// Receive the message
TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
if (messageReceived.getStringProperty("count") == null) {
throw new RuntimeException(("missed property count"));
}
System.out.println("message = " + messageReceived.getText() + " property count (added by interceptor = " + messageReceived.getStringProperty("count") + ")");
} finally {
if (connection != null) {
connection.close();
}
}
}
}

View File

@ -0,0 +1,199 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>true</persistence-enabled>
<!-- this could be ASYNCIO, MAPPED, NIO
ASYNCIO: Linux Libaio
MAPPED: mmap files
NIO: Plain Java Files
-->
<journal-type>NIO</journal-type>
<paging-directory>./data/paging</paging-directory>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>-1</journal-pool-files>
<journal-file-size>10M</journal-file-size>
<!--
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
<network-check-NIC>theNicName</network-check-NIC>
-->
<!--
Use this to use an HTTP server to validate the network
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
<!-- <network-check-period>10000</network-check-period> -->
<!-- <network-check-timeout>1000</network-check-timeout> -->
<!-- this is a comma separated list, no spaces, just DNS or IPs
it should accept IPV6
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
<!-- <network-check-list>10.0.0.1</network-check-list> -->
<!-- use this to customize the ping used for ipv4 addresses -->
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
<!-- use this to customize the ping used for ipv6 addresses -->
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<!-- the system will enter into page mode once you hit this limit.
This is an estimate in bytes of how much the messages are using in memory
The system will use half of the available memory (-Xmx) by default for the global-max-size.
You may specify a different value here if you need to customize it to your needs.
<global-max-size>100Mb</global-max-size>
-->
<broker-plugins>
<broker-plugin class-name="org.apache.activemq.artemis.jms.example.BrokerPlugin"/>
</broker-plugins>
<acceptors>
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
<!-- amqpCredits: The number of credits sent to AMQP producers -->
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createAddress" roles="guest"/>
<permission type="deleteAddress" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="browse" roles="guest"/>
<permission type="send" roles="guest"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="guest"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
</addresses>
</core>
</configuration>

View File

@ -43,6 +43,7 @@ under the License.
<modules>
<module>bridge</module>
<module>browser</module>
<module>broker-plugin</module>
<module>cdi</module>
<module>client-kickoff</module>
<module>consumer-rate-limit</module>
@ -107,6 +108,7 @@ under the License.
<modules>
<module>bridge</module>
<module>browser</module>
<module>broker-plugin</module>
<module>cdi</module>
<module>client-kickoff</module>
<module>consumer-rate-limit</module>