ARTEMIS-3515 Adding example with multicast and broker connection

This commit is contained in:
Clebert Suconic 2021-10-05 12:01:00 -04:00 committed by clebertsuconic
parent 20445bef7c
commit d6237cb4d8
6 changed files with 518 additions and 0 deletions

View File

@ -0,0 +1,164 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq.examples.broker-connection</groupId>
<artifactId>broker-connections</artifactId>
<version>2.19.0-SNAPSHOT</version>
</parent>
<artifactId>amqp-sending-messages-multicast</artifactId>
<packaging>jar</packaging>
<name>amqp-sending-messages-queues</name>
<properties>
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-maven-plugin</artifactId>
<executions>
<execution>
<id>create0</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<instance>${basedir}/target/server0</instance>
<allowAnonymous>true</allowAnonymous>
<configuration>${basedir}/target/classes/activemq/server0</configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
</configuration>
</execution>
<execution>
<id>create1</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<instance>${basedir}/target/server1</instance>
<allowAnonymous>true</allowAnonymous>
<configuration>${basedir}/target/classes/activemq/server1</configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
</configuration>
</execution>
<!-- we first start broker 1, to avoid reconnecting statements -->
<execution>
<id>start1</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<spawn>true</spawn>
<location>${basedir}/target/server1</location>
<testURI>tcp://localhost:5771</testURI>
<args>
<param>run</param>
</args>
<name>server1</name>
</configuration>
</execution>
<execution>
<id>start0</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<spawn>true</spawn>
<ignore>${noServer}</ignore>
<location>${basedir}/target/server0</location>
<testURI>tcp://localhost:5660</testURI>
<args>
<param>run</param>
</args>
<name>server0</name>
</configuration>
</execution>
<execution>
<id>runClient</id>
<goals>
<goal>runClient</goal>
</goals>
<configuration>
<!-- you may have to set export MAVEN_OPTS="-Djava.net.preferIPv4Stack=true"
if you are on MacOS for instance -->
<clientClass>org.apache.activemq.artemis.jms.example.BrokerConnectionSender</clientClass>
</configuration>
</execution>
<execution>
<id>stop0</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<location>${basedir}/target/server0</location>
<args>
<param>stop</param>
</args>
</configuration>
</execution>
<execution>
<id>stop1</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<location>${basedir}/target/server1</location>
<args>
<param>stop</param>
</args>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.activemq.examples.broker-connection</groupId>
<artifactId>amqp-sending-messages-multicast</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,9 @@
# AMQP Broker Connection with Senders on Multicast
To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to create and start the broker manually.
This example demonstrates how you can create a broker connection from one broker towards another broker, and send messages from that broker towards the target server.
You basically configured the broker connection on broker.xml and this example will give you two working servers where you send messages in one broker and receive it on another broker.
This example will use a single multicast queue to distribute to another multicast address on another broker.

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.example;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.qpid.jms.JmsConnectionFactory;
/**
* This example is demonstrating how messages are transferred from one broker towards another broker
* through the sender operation on a AMQP Broker Connection.
*/
public class BrokerConnectionSender {
public static void main(final String[] args) throws Exception {
Connection connectionOnServer0 = null;
ConnectionFactory connectionFactoryServer0 = new JmsConnectionFactory("amqp://localhost:5660");
Connection connectionOnServer1 = null;
ConnectionFactory connectionFactoryServer1 = new JmsConnectionFactory("amqp://localhost:5771");
// to make the example more interesting I'm creating a durable subscription on server1 before we start the consumer on server0
// this subscription will be reconnected at the end after the sends
try {
connectionOnServer1 = connectionFactoryServer1.createConnection();
connectionOnServer1.setClientID("id1");
connectionOnServer1.start();
Session session = connectionOnServer1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("exampleTopic");
session.createDurableSubscriber(topic, "hello");
} finally {
connectionOnServer1.close();
}
try {
connectionOnServer0 = connectionFactoryServer0.createConnection();
Session session = connectionOnServer0.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("exampleTopic");
MessageProducer sender = session.createProducer(topic);
for (int i = 0; i < 100; i++) {
sender.send(session.createTextMessage("Hello world n" + i));
}
} finally {
if (connectionFactoryServer0 != null) {
connectionOnServer0.close();
}
}
connectionOnServer1 = null;
connectionFactoryServer1 = new JmsConnectionFactory("amqp://localhost:5771");
try {
connectionOnServer1 = connectionFactoryServer1.createConnection();
connectionOnServer1.setClientID("id1");
connectionOnServer1.start();
Session session = connectionOnServer1.createSession(false, Session.AUTO_ACKNOWLEDGE);
{
Queue queue = session.createQueue("exampleTopic::q2");
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < 100; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
System.out.println("Received message " + message.getText() + " on q2");
}
}
{
Queue queue = session.createQueue("exampleTopic::q3");
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < 100; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
System.out.println("Received message " + message.getText() + " on q3");
}
}
// Receiving messages using the topic subscription API
{
Topic topic = session.createTopic("exampleTopic");
MessageConsumer consumer = session.createDurableSubscriber(topic, "hello");
for (int i = 0; i < 100; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
System.out.println("Received message " + message.getText() + " on a topic subscription");
}
}
} finally {
if (connectionOnServer1 != null) {
connectionOnServer1.close();
}
}
}
}

View File

@ -0,0 +1,117 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>false</persistence-enabled>
<journal-type>NIO</journal-type>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>44000</page-sync-timeout>
<acceptors>
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:5660?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
</acceptors>
<broker-connections>
<amqp-connection uri="tcp://localhost:5771" name="sender" retry-interval="100">
<!-- this will create a subscription channel between exampleTopic on this server and the other server -->
<sender queue-name="subscriptionExampleTopic"/>
</amqp-connection>
</broker-connections>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createAddress" roles="guest"/>
<permission type="deleteAddress" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="browse" roles="guest"/>
<permission type="send" roles="guest"/>
<permission type="manage" roles="guest"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="exampleTopic">
<multicast>
<queue name="subscriptionExampleTopic" />
</multicast>
</address>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,111 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>false</persistence-enabled>
<journal-type>NIO</journal-type>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>44000</page-sync-timeout>
<acceptors>
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:5771?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createAddress" roles="guest"/>
<permission type="deleteAddress" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="browse" roles="guest"/>
<permission type="send" roles="guest"/>
<permission type="manage" roles="guest"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="exampleTopic">
<multicast>
<queue name="q2" />
<queue name="q3" />
</multicast>
</address>
</addresses>
</core>
</configuration>

View File

@ -48,6 +48,7 @@ under the License.
<id>examples</id>
<modules>
<module>amqp-sending-messages</module>
<module>amqp-sending-messages-multicast</module>
<module>amqp-receiving-messages</module>
<module>amqp-sending-overssl</module>
<module>disaster-recovery</module>