diff --git a/examples/features/broker-connection/amqp-sending-messages-multicast/pom.xml b/examples/features/broker-connection/amqp-sending-messages-multicast/pom.xml new file mode 100644 index 0000000000..983e88dc70 --- /dev/null +++ b/examples/features/broker-connection/amqp-sending-messages-multicast/pom.xml @@ -0,0 +1,164 @@ + + + + + 4.0.0 + + + org.apache.activemq.examples.broker-connection + broker-connections + 2.19.0-SNAPSHOT + + + amqp-sending-messages-multicast + jar + amqp-sending-messages-queues + + + ${project.basedir}/../../../.. + + + + + org.apache.qpid + qpid-jms-client + + + + + + + org.apache.activemq + artemis-maven-plugin + + + create0 + + create + + + ${noServer} + ${basedir}/target/server0 + true + ${basedir}/target/classes/activemq/server0 + + -Djava.net.preferIPv4Stack=true + + + + create1 + + create + + + ${noServer} + ${basedir}/target/server1 + true + ${basedir}/target/classes/activemq/server1 + + -Djava.net.preferIPv4Stack=true + + + + + start1 + + cli + + + ${noServer} + true + ${basedir}/target/server1 + tcp://localhost:5771 + + run + + server1 + + + + start0 + + cli + + + true + ${noServer} + ${basedir}/target/server0 + tcp://localhost:5660 + + run + + server0 + + + + runClient + + runClient + + + + org.apache.activemq.artemis.jms.example.BrokerConnectionSender + + + + stop0 + + cli + + + ${noServer} + ${basedir}/target/server0 + + stop + + + + + stop1 + + cli + + + ${noServer} + ${basedir}/target/server1 + + stop + + + + + + + org.apache.activemq.examples.broker-connection + amqp-sending-messages-multicast + ${project.version} + + + + + org.apache.maven.plugins + maven-clean-plugin + + + + diff --git a/examples/features/broker-connection/amqp-sending-messages-multicast/readme.md b/examples/features/broker-connection/amqp-sending-messages-multicast/readme.md new file mode 100644 index 0000000000..bd0d781e3b --- /dev/null +++ b/examples/features/broker-connection/amqp-sending-messages-multicast/readme.md @@ -0,0 +1,9 @@ +# AMQP Broker Connection with Senders on Multicast + +To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to create and start the broker manually. + +This example demonstrates how you can create a broker connection from one broker towards another broker, and send messages from that broker towards the target server. + +You basically configured the broker connection on broker.xml and this example will give you two working servers where you send messages in one broker and receive it on another broker. + +This example will use a single multicast queue to distribute to another multicast address on another broker. diff --git a/examples/features/broker-connection/amqp-sending-messages-multicast/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionSender.java b/examples/features/broker-connection/amqp-sending-messages-multicast/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionSender.java new file mode 100644 index 0000000000..3305245592 --- /dev/null +++ b/examples/features/broker-connection/amqp-sending-messages-multicast/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionSender.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jms.example; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.qpid.jms.JmsConnectionFactory; + +/** + * This example is demonstrating how messages are transferred from one broker towards another broker + * through the sender operation on a AMQP Broker Connection. + */ +public class BrokerConnectionSender { + + public static void main(final String[] args) throws Exception { + Connection connectionOnServer0 = null; + ConnectionFactory connectionFactoryServer0 = new JmsConnectionFactory("amqp://localhost:5660"); + + Connection connectionOnServer1 = null; + ConnectionFactory connectionFactoryServer1 = new JmsConnectionFactory("amqp://localhost:5771"); + + // to make the example more interesting I'm creating a durable subscription on server1 before we start the consumer on server0 + // this subscription will be reconnected at the end after the sends + try { + connectionOnServer1 = connectionFactoryServer1.createConnection(); + connectionOnServer1.setClientID("id1"); + connectionOnServer1.start(); + Session session = connectionOnServer1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic("exampleTopic"); + session.createDurableSubscriber(topic, "hello"); + } finally { + connectionOnServer1.close(); + } + + try { + + connectionOnServer0 = connectionFactoryServer0.createConnection(); + + Session session = connectionOnServer0.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic topic = session.createTopic("exampleTopic"); + MessageProducer sender = session.createProducer(topic); + for (int i = 0; i < 100; i++) { + sender.send(session.createTextMessage("Hello world n" + i)); + } + } finally { + if (connectionFactoryServer0 != null) { + connectionOnServer0.close(); + } + } + + connectionOnServer1 = null; + connectionFactoryServer1 = new JmsConnectionFactory("amqp://localhost:5771"); + + try { + connectionOnServer1 = connectionFactoryServer1.createConnection(); + connectionOnServer1.setClientID("id1"); + connectionOnServer1.start(); + Session session = connectionOnServer1.createSession(false, Session.AUTO_ACKNOWLEDGE); + + { + Queue queue = session.createQueue("exampleTopic::q2"); + MessageConsumer consumer = session.createConsumer(queue); + for (int i = 0; i < 100; i++) { + TextMessage message = (TextMessage) consumer.receive(5000); + System.out.println("Received message " + message.getText() + " on q2"); + } + } + + { + Queue queue = session.createQueue("exampleTopic::q3"); + MessageConsumer consumer = session.createConsumer(queue); + for (int i = 0; i < 100; i++) { + TextMessage message = (TextMessage) consumer.receive(5000); + System.out.println("Received message " + message.getText() + " on q3"); + } + } + + + // Receiving messages using the topic subscription API + { + Topic topic = session.createTopic("exampleTopic"); + MessageConsumer consumer = session.createDurableSubscriber(topic, "hello"); + for (int i = 0; i < 100; i++) { + TextMessage message = (TextMessage) consumer.receive(5000); + System.out.println("Received message " + message.getText() + " on a topic subscription"); + } + } + } finally { + if (connectionOnServer1 != null) { + connectionOnServer1.close(); + } + } + } +} diff --git a/examples/features/broker-connection/amqp-sending-messages-multicast/src/main/resources/activemq/server0/broker.xml b/examples/features/broker-connection/amqp-sending-messages-multicast/src/main/resources/activemq/server0/broker.xml new file mode 100644 index 0000000000..57ee932eda --- /dev/null +++ b/examples/features/broker-connection/amqp-sending-messages-multicast/src/main/resources/activemq/server0/broker.xml @@ -0,0 +1,117 @@ + + + + + + + + 0.0.0.0 + + + false + + NIO + + + true + + 120000 + + 60000 + + HALT + + + 44000 + + + + tcp://0.0.0.0:5660?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true + + + + + + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + true + true + + + + + +
+ + + +
+
+ +
+
diff --git a/examples/features/broker-connection/amqp-sending-messages-multicast/src/main/resources/activemq/server1/broker.xml b/examples/features/broker-connection/amqp-sending-messages-multicast/src/main/resources/activemq/server1/broker.xml new file mode 100644 index 0000000000..8b25279516 --- /dev/null +++ b/examples/features/broker-connection/amqp-sending-messages-multicast/src/main/resources/activemq/server1/broker.xml @@ -0,0 +1,111 @@ + + + + + + + + 0.0.0.0 + + + false + + NIO + + + true + + 120000 + + 60000 + + HALT + + + 44000 + + + + tcp://0.0.0.0:5771?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + true + true + + + + +
+ + + + +
+
+ +
+
diff --git a/examples/features/broker-connection/pom.xml b/examples/features/broker-connection/pom.xml index db7c22a0a8..dbfa5dcf8b 100644 --- a/examples/features/broker-connection/pom.xml +++ b/examples/features/broker-connection/pom.xml @@ -48,6 +48,7 @@ under the License. examples amqp-sending-messages + amqp-sending-messages-multicast amqp-receiving-messages amqp-sending-overssl disaster-recovery