ARTEMIS-1169 - Implement Interceptors for the AMQP protocol

https://issues.apache.org/jira/browse/ARTEMIS-1169
This commit is contained in:
Andy Taylor 2017-05-17 09:28:44 +01:00 committed by Justin Bertram
parent 88cf826dca
commit d17ef14c90
14 changed files with 602 additions and 19 deletions

View File

@ -250,4 +250,12 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
}
return null;
}
public void invokeIncomingInterceptors(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
manager.invokeIncoming(message, connection);
}
public void invokeOutgoingInterceptors(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
manager.invokeOutgoing(message, connection);
}
}

View File

@ -461,7 +461,7 @@ public class AMQPSessionCallback implements SessionCallback {
final Delivery delivery,
final Receiver receiver) throws Exception {
message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
invokeIncoming((AMQPMessage) message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection());
serverSession.send(transaction, message, false, false);
afterIO(new IOCallback() {
@ -660,4 +660,12 @@ public class AMQPSessionCallback implements SessionCallback {
public void check(SimpleString address, CheckType checkType, SecurityAuth session) throws Exception {
manager.getServer().getSecurityStore().check(address, checkType, session);
}
public void invokeIncoming(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
protonSPI.invokeIncomingInterceptors(message, connection);
}
public void invokeOutgoing(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
protonSPI.invokeOutgoingInterceptors(message, connection);
}
}

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.broker;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
public interface AmqpInterceptor extends BaseInterceptor<AMQPMessage> {
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.protocol.amqp.broker;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@ -25,7 +26,6 @@ import java.util.concurrent.Executor;
import io.netty.channel.ChannelPipeline;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@ -36,8 +36,8 @@ import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
@ -46,10 +46,13 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection;
/**
* A proton protocol manager, basically reads the Proton Input and maps proton resources to ActiveMQ Artemis resources
*/
public class ProtonProtocolManager implements ProtocolManager<Interceptor>, NotificationListener {
public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage, AmqpInterceptor, ActiveMQProtonRemotingConnection> implements NotificationListener {
private static final List<String> websocketRegistryNames = Arrays.asList("amqp");
private final List<AmqpInterceptor> incomingInterceptors = new ArrayList<>();
private final List<AmqpInterceptor> outgoingInterceptors = new ArrayList<>();
private final ActiveMQServer server;
private final ProtonProtocolManagerFactory factory;
@ -69,9 +72,10 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
private int maxFrameSize = AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE;
public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) {
public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server, List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> outgoingInterceptors) {
this.factory = factory;
this.server = server;
this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
}
public ActiveMQServer getServer() {
@ -84,14 +88,17 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
}
@Override
public ProtocolManagerFactory<Interceptor> getFactory() {
public ProtocolManagerFactory<AmqpInterceptor> getFactory() {
return factory;
}
@Override
public void updateInterceptors(List<BaseInterceptor> incomingInterceptors,
List<BaseInterceptor> outgoingInterceptors) {
// no op
public void updateInterceptors(List incoming, List outgoing) {
this.incomingInterceptors.clear();
this.incomingInterceptors.addAll(getFactory().filterInterceptors(incoming));
this.outgoingInterceptors.clear();
this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing));
}
@Override
@ -207,4 +214,12 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
public Map<SimpleString, RoutingType> getPrefixes() {
return prefixes;
}
public void invokeIncoming(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
super.invokeInterceptors(this.incomingInterceptors, message, connection);
}
public void invokeOutgoing(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
super.invokeInterceptors(this.outgoingInterceptors, message, connection);
}
}

View File

@ -16,12 +16,10 @@
*/
package org.apache.activemq.artemis.protocol.amqp.broker;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -32,7 +30,7 @@ import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.osgi.service.component.annotations.Component;
@Component(service = ProtocolManagerFactory.class)
public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> {
public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory<AmqpInterceptor> {
public static final byte ID = 2;
@ -57,13 +55,12 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
final Map<String, Object> parameters,
List<BaseInterceptor> incomingInterceptors,
List<BaseInterceptor> outgoingInterceptors) throws Exception {
return BeanSupport.setData(new ProtonProtocolManager(this, server), parameters);
return BeanSupport.setData(new ProtonProtocolManager(this, server, incomingInterceptors, outgoingInterceptors), parameters);
}
@Override
public List<Interceptor> filterInterceptors(List<BaseInterceptor> interceptors) {
// no interceptors on Proton
return Collections.emptyList();
public List<AmqpInterceptor> filterInterceptors(List<BaseInterceptor> interceptors) {
return internalFilterInterceptors(AmqpInterceptor.class, interceptors);
}
@Override

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import org.apache.activemq.artemis.spi.core.remoting.TopologyResponseHandler;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.Lock;
@ -40,7 +41,7 @@ import java.util.concurrent.locks.Lock;
public class ProtonClientProtocolManager extends ProtonProtocolManager implements ClientProtocolManager {
public ProtonClientProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) {
super(factory, server);
super(factory, server, Collections.emptyList(), Collections.emptyList());
}
@Override

View File

@ -0,0 +1,121 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq.examples.broker</groupId>
<artifactId>jms-examples</artifactId>
<version>2.2.0-SNAPSHOT</version>
</parent>
<artifactId>interceptor-client-amqp</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ Artemis AMQP Interceptor Example</name>
<properties>
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-amqp-protocol</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>${qpid.jms.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-maven-plugin</artifactId>
<executions>
<execution>
<id>create</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<libList><arg>org.apache.activemq.examples.broker:interceptor-client-amqp:${project.version}</arg></libList>
<ignore>${noServer}</ignore>
<configuration>${basedir}/target/classes/activemq/server0</configuration>
</configuration>
</execution>
<execution>
<id>start</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<spawn>true</spawn>
<testURI>tcp://localhost:61616</testURI>
<args>
<param>run</param>
</args>
</configuration>
</execution>
<execution>
<id>runClient</id>
<goals>
<goal>runClient</goal>
</goals>
<configuration>
<clientClass>org.apache.activemq.artemis.amqp.example.InterceptorExample</clientClass>
</configuration>
</execution>
<execution>
<id>stop</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<args>
<param>stop</param>
</args>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.activemq.examples.broker</groupId>
<artifactId>interceptor-client-amqp</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,71 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<html>
<head>
<title>ActiveMQ Artemis AMQP Interceptor Example</title>
<link rel="stylesheet" type="text/css" href="../../../common/common.css" />
<link rel="stylesheet" type="text/css" href="../../../common/prettify.css" />
<script type="text/javascript" src="../../../common/prettify.js"></script>
</head>
<body onload="prettyPrint()">
<h1>AMQP Interceptor Example</h1>
<pre>To run the example, simply type <b>mvn verify</b> from this directory, <br>or <b>mvn -PnoServer verify</b> if you want to start and create the server manually.</pre>
<p>This example shows you how to implement and configure a simple incoming, server-side AMQP interceptor with ActiveMQ Artemis.</p>
<p>ActiveMQ Artemis allows an application to use an interceptor to hook into the messaging system. All that needs to do is to implement the
Interceptor interface, as defined below: </p>
<pre class="prettyprint">
<code>
public interface AmqpInterceptor
{
boolean intercept(final AMQPMessage message, RemotingConnection connection);
}
</code>
</pre>
<p>Once you have your own interceptor class, add it to the broker.xml, as follows:</p>
<pre class="prettyprint">
<code>
&lt;configuration&gt;
...
&lt;remoting-incoming-interceptors&gt;
&lt;class-name&gt;org.apache.activemq.artemis.amqp.example.SimpleAMQPInterceptor&lt;/class-name&gt;
&lt;/remoting-incoming-interceptors&gt;
...
&lt;/configuration&gt;
</code>
</pre>
<p>With interceptor, you can handle various events in message processing. In this example, a simple interceptor, SimpleAMQPInterceptor, is implemented and configured.
When the example is running, the interceptor examine and log properties of the AMQP message.</p>
<p>With our interceptor we always return <code>true</code> from the <code>intercept</code> method. If we were
to return <code>false</code> that signifies that no more interceptors are to run.
Throw an exception to abort processing of the packet.</p>
</body>
</html>

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.amqp.example;
import org.apache.qpid.jms.JmsConnectionFactory;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* A simple example that shows how to implement and use interceptors with ActiveMQ Artemis with the AMQP protocol.
*/
public class InterceptorExample {
public static void main(final String[] args) throws Exception {
JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue interceptorQueue = session.createQueue("interceptorQueue");
MessageProducer producer = session.createProducer(interceptorQueue);
TextMessage textMessage = session.createTextMessage("A text message");
textMessage.setStringProperty("SimpleAmqpInterceptor", "SimpleAmqpInterceptorValue");
producer.send(textMessage);
}
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.amqp.example;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
/**
* A simple Interceptor implementation
*/
public class SimpleAmqpInterceptor implements AmqpInterceptor {
@Override
public boolean intercept(final AMQPMessage message, RemotingConnection connection) {
System.out.println("AMQP Interceptor gets called with message " + message.getMessageID());
String val = message.getStringProperty("SimpleAmqpInterceptor");
System.out.println("intercepted message with property " + val);
return true;
}
}

View File

@ -0,0 +1,176 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>true</persistence-enabled>
<!-- this could be ASYNCIO or NIO
-->
<journal-type>ASYNCIO</journal-type>
<paging-directory>./data/paging</paging-directory>
<remoting-incoming-interceptors>
<class-name>org.apache.activemq.artemis.amqp.example.SimpleAmqpInterceptor</class-name>
</remoting-incoming-interceptors>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>-1</journal-pool-files>
<!--
You can specify the NIC you want to use to verify if the network
<network-check-NIC>theNickName</network-check-NIC>
-->
<!--
Use this to use an HTTP server to validate the network
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
<!-- <network-check-period>10000</network-check-period> -->
<!-- <network-check-timeout>1000</network-check-timeout> -->
<!-- this is a comma separated list, no spaces, just DNS or IPs
it should accept IPV6
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
<!-- <network-check-list>10.0.0.1</network-check-list> -->
<!-- use this to customize the ping used for ipv4 addresses -->
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
<!-- use this to customize the ping used for ipv6 addresses -->
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- the system will enter into page mode once you hit this limit.
This is an estimate in bytes of how much the messages are using in memory -->
<global-max-size>100Mb</global-max-size>
<acceptors>
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createAddress" roles="guest"/>
<permission type="deleteAddress" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="browse" roles="guest"/>
<permission type="send" roles="guest"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="guest"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
</addresses>
</core>
</configuration>

View File

@ -59,8 +59,8 @@ under the License.
When the example is running, the interceptor will modify the payload of a sample MQTT message.</p>
<p>With our interceptor we always return <code>true</code> from the <code>intercept</code> method. If we were
to return <code>false</code> that signifies that no more interceptors are to run or the target
is not to be called. Return <code>false</code> to abort processing of the packet.</p>
to return <code>false</code> that signifies that no more interceptors are to run.
Throw an exception to abort processing of the packet.</p>
</body>
</html>

View File

@ -57,6 +57,7 @@ under the License.
<module>interceptor</module>
<module>interceptor-client</module>
<module>interceptor-client-mqtt</module>
<module>interceptor-client-amqp</module>
<module>instantiate-connection-factory</module>
<module>jms-auto-closeable</module>
<module>jms-bridge</module>
@ -120,6 +121,7 @@ under the License.
<module>interceptor</module>
<module>interceptor-client</module>
<module>interceptor-client-mqtt</module>
<module>interceptor-client-amqp</module>
<module>jms-auto-closeable</module>
<module>instantiate-connection-factory</module>
<module>jms-bridge</module>

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Test basic send and receive scenarios using only AMQP sender and receiver links.
*/
public class AmqpSendReceiveInterceptorTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testCreateQueueReceiver() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
server.getRemotingService().addIncomingInterceptor(new AmqpInterceptor() {
@Override
public boolean intercept(AMQPMessage message, RemotingConnection connection) throws ActiveMQException {
latch.countDown();
return true;
}
});
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg" + 1);
message.setText("Test-Message");
sender.send(message);
assertTrue(latch.await(5, TimeUnit.SECONDS));
final CountDownLatch latch2 = new CountDownLatch(1);
server.getRemotingService().addOutgoingInterceptor(new AmqpInterceptor() {
@Override
public boolean intercept(AMQPMessage packet, RemotingConnection connection) throws ActiveMQException {
latch2.countDown();
return true;
}
});
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(2);
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(latch.getCount(), 0);
receiver.close();
connection.close();
}
}