From bf52134dc0bb0ebefbe6228b26ae5326a6b2baec Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 28 Oct 2020 12:34:48 -0400 Subject: [PATCH] ARTEMIS-2937 Fixing Tests and some review --- .../amqp/broker/AMQPSessionCallback.java | 2 +- .../amqp/broker/ProtonProtocolManager.java | 7 --- .../amqp/connect/AMQPBrokerConnection.java | 20 +++---- .../connect/AMQPBrokerConnectionManager.java | 21 +++---- .../mirror/AMQPMirrorControllerSource.java | 12 ++-- .../server/impl/MessageReferenceImpl.java | 1 - .../user-manual/en/amqp-broker-connections.md | 56 +++++++++++-------- .../amqp-receiving-messages/pom.xml | 4 +- .../amqp-receiving-messages/readme.md | 2 +- .../jms/example/BrokerConnectionReceiver.java | 9 +-- .../resources/activemq/server0/broker.xml | 2 +- .../amqp-sending-messages/pom.xml | 6 +- .../jms/example/BrokerConnectionSender.java | 4 +- .../resources/activemq/server0/broker.xml | 2 +- .../amqp-sending-overssl/pom.xml | 4 +- .../amqp-sending-overssl/readme.md | 24 +++----- .../example/BrokerConnectionSenderSSL.java | 3 +- .../resources/activemq/server0/broker.xml | 2 +- .../amqp/connect/QpidDispatchPeerTest.java | 6 +- 19 files changed, 91 insertions(+), 96 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index d680169c82..cc5616a358 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -466,8 +466,8 @@ public class AMQPSessionCallback implements SessionCallback { RoutingType routingType = null; if (address != null) { + // set Fixed-address producer if the message.properties.to address differs from the producer if (!address.toString().equals(message.getAddress())) { - // set Fixed-address producer if the message.properties.to address differs from the producer message.setAddress(address); } routingType = context.getDefRoutingType(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index d46ad291d8..666abb1fa9 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -198,13 +198,6 @@ public class ProtonProtocolManager extends AbstractProtocolManager { - protonRemotingConnection.getAmqpConnection().open(); - protonRemotingConnection.getAmqpConnection().flush(); - }); - session = protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session(); sessionContext = protonRemotingConnection.getAmqpConnection().getSessionExtension(session); + protonRemotingConnection.getAmqpConnection().runLater(() -> { + protonRemotingConnection.getAmqpConnection().open(); session.open(); protonRemotingConnection.getAmqpConnection().flush(); }); @@ -306,6 +303,10 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, } } + private static void uninstallMirrorController(AMQPMirrorBrokerConnectionElement replicaConfig, ActiveMQServer server) { + + } + /** The reason this method is static is the following: * * It is returning the snfQueue to the replica, and I needed isolation from the actual instance. @@ -388,7 +389,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, return; } receivers.add(queue); - Receiver receiver = session.receiver(queue.getName().toString() + UUIDGenerator.getInstance().generateStringUUID()); + Receiver receiver = session.receiver(queue.getAddress().toString() + ":" + UUIDGenerator.getInstance().generateStringUUID()); receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); Target target = new Target(); @@ -433,11 +434,11 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, return; } senders.add(queue); - Sender sender = session.sender(targetName + UUIDGenerator.getInstance().generateStringUUID()); + Sender sender = session.sender(targetName + ":" + UUIDGenerator.getInstance().generateStringUUID()); sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); Target target = new Target(); - target.setAddress(queue.getAddress().toString()); + target.setAddress(targetName); if (capabilities != null) { target.setCapabilities(capabilities); } @@ -484,7 +485,6 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, @Override public void close() throws Exception { - // TODO implement close } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java index 7a17dc74c3..90382e49e3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.protocol.amqp.connect; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.locks.Lock; @@ -49,7 +50,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon private volatile boolean started = false; List amqpConnectionsConfig; - List amqpOutgoingConnections; + List amqpBrokerConnectionList; ProtonProtocolManager protonProtocolManager; public AMQPBrokerConnectionManager(ProtonProtocolManagerFactory factory, List amqpConnectionsConfig, ActiveMQServer server) { @@ -64,6 +65,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon started = true; } + amqpBrokerConnectionList = new ArrayList<>(); for (AMQPBrokerConnectConfiguration config : amqpConnectionsConfig) { @@ -76,6 +78,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon logger.debug("Connecting " + config); } AMQPBrokerConnection amqpBrokerConnection = new AMQPBrokerConnection(this, config, protonProtocolManager, server, bridgesConnector); + amqpBrokerConnectionList.add(amqpBrokerConnection); server.registerBrokerConnection(amqpBrokerConnection); if (config.isAutostart()) { amqpBrokerConnection.start(); @@ -90,10 +93,8 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon public void stop() throws Exception { if (started) { started = false; - if (amqpOutgoingConnections != null) { - for (AMQPBrokerConnection connection : amqpOutgoingConnections) { - connection.stop(); - } + for (AMQPBrokerConnection connection : amqpBrokerConnectionList) { + connection.stop(); } } } @@ -110,8 +111,8 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon @Override public void connectionDestroyed(Object connectionID) { - for (AMQPBrokerConnection connection : amqpOutgoingConnections) { - if (connection.getConnection().getID().equals(connectionID)) { + for (AMQPBrokerConnection connection : amqpBrokerConnectionList) { + if (connection.getConnection() != null && connectionID.equals(connection.getConnection().getID())) { connection.connectionDestroyed(connectionID); } } @@ -119,8 +120,8 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon @Override public void connectionException(Object connectionID, ActiveMQException me) { - for (AMQPBrokerConnection connection : amqpOutgoingConnections) { - if (connection.getConnection().getID().equals(connectionID)) { + for (AMQPBrokerConnection connection : amqpBrokerConnectionList) { + if (connection.getConnection() != null && connectionID.equals(connection.getConnection().getID())) { connection.connectionException(connectionID, me); } } @@ -129,7 +130,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon @Override public void connectionReadyForWrites(Object connectionID, boolean ready) { - for (AMQPBrokerConnection connection : amqpOutgoingConnections) { + for (AMQPBrokerConnection connection : amqpBrokerConnectionList) { if (connection.getConnection().getID().equals(connectionID)) { connection.connectionReadyForWrites(connectionID, ready); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java index 8fa6fe542b..645b386162 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java @@ -54,8 +54,8 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom public static final Symbol DELETE_ADDRESS = Symbol.getSymbol("deleteAddress"); public static final Symbol CREATE_QUEUE = Symbol.getSymbol("createQueue"); public static final Symbol DELETE_QUEUE = Symbol.getSymbol("deleteQueue"); - public static final Symbol ADDRESS_SCAN_START = Symbol.getSymbol("AddressCanStart"); - public static final Symbol ADDRESS_SCAN_END = Symbol.getSymbol("AddressScanEnd"); + public static final Symbol ADDRESS_SCAN_START = Symbol.getSymbol("addressCanStart"); + public static final Symbol ADDRESS_SCAN_END = Symbol.getSymbol("addressScanEnd"); public static final Symbol POST_ACK = Symbol.getSymbol("postAck"); // Delivery annotation property used on mirror control routing and Ack @@ -141,16 +141,16 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom snfQueue.refUp(ref); - Map symbolObjectMap = new HashMap<>(); - DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(symbolObjectMap); - symbolObjectMap.put(INTERNAL_ID, message.getMessageID()); + Map daMap = new HashMap<>(); + DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(daMap); + daMap.put(INTERNAL_ID, message.getMessageID()); String address = message.getAddress(); if (address != null) { // this is the message that was set through routing Properties amqpProperties = getProperties(message); if (amqpProperties == null || !address.equals(amqpProperties.getTo())) { // We set the internal destination property only if we need to // otherwise we just use the one already set over Properties - symbolObjectMap.put(INTERNAL_DESTINATION, message.getAddress()); + daMap.put(INTERNAL_DESTINATION, message.getAddress()); } } ref.setProtocolData(deliveryAnnotations); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java index 34aacf5635..09b3650ffd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java @@ -362,7 +362,6 @@ public class MessageReferenceImpl extends LinkedListImpl.Node - - - - - + + + + + -
- - - -
-
- - - -
+
+ + + +
+
+ + + +
``` @@ -122,14 +122,16 @@ Using queue names: # Peers A peer broker connection element is a combination of sender and receivers. The ActiveMQ Artemis broker creates both a sender and a receiver for a peer element, and the endpoint knows how to deal with the pair without creating an infinite loop of sending and receiving messages. -Currently, [Apache Qpid Dispatch Router](https://qpid.apache.org/components/dispatch-router/index.html) is a peer. ActiveMQ Artemis creates the pair of receivers and sender for each matching destination. These senders and receivers have special configuration to let Qpid Dispatch Router know to collaborate with ActiveMQ Artemis. +Currently, [Apache Qpid Dispatch Router](https://qpid.apache.org/components/dispatch-router/index.html) can act as a peer. ActiveMQ Artemis creates the pair of receivers and sender for each matching destination. These senders and receivers have special configuration to let Qpid Dispatch Router know to collaborate with ActiveMQ Artemis. You can experiment with advanced networking scenarios with Qpid Dispatch Router and get a lot of benefit from the AMQP protocol and its ecosystem. +When you add this option, the broker will act as a way point on Qpid Dispatch, and the broker will act as a store point for the messages. For more information refer to the documentation on [Apache Qpid Dispatch Router](https://qpid.apache.org/components/dispatch-router/index.html). + With a peer, you have the same properties that you have on a sender and receiver. For example: ```xml - + @@ -137,12 +139,12 @@ With a peer, you have the same properties that you have on a sender and receiver
- +
- +
@@ -180,8 +182,15 @@ An example of a mirror configuration is shown below: ``` -## Catch up on Mirror -The broker will not send past events over the mirror. As the broker sends and receives messages, only a natural catch up would eventually happen. +*Important*: One broker can have multiple replicas (1 to many). However a replica site can only have a single source. Make sure you do not connect multiple replicas to a single mirror. + +## Pre existing messages +The broker will not send pre existing messages through the mirror. So, If you add mirror to your configuration and the journal had pre existing messages these messages will not be sent. + +## Broker Connection Stop and Disconnect +Once you start the broker connection with a mirror the mirror events will always be sent to the temporary queue configured at the `source-mirror-address`. + +It is possible to stop the broker connection with the operation stopBrokerConnection(connectionName) on the ServerControl, but it is only effective to disconnect the brokers, while the mirror events are always captured. ## Disaster & Recovery considerations As you use the mirror option to replicate data across datacenters, you have to take a few considerations: @@ -195,8 +204,7 @@ As you use the mirror option to replicate data across datacenters, you have to t * You can have a disabled broker connection to be enabled after the disaster. -## Mirror example sending acknowledgements - +## Mirror example with Failback On this example lets play with two brokers: - sourceBroker - replicaBroker @@ -211,7 +219,7 @@ Add this configuration on sourceBroker: ``` -On the replicaBroker, add disabled broker connection for failing back after a disaster, and also set the acceptors with autoStart=false +On the replicaBroker, add a disabled broker connection for failing back after a disaster, and also set the acceptors with autoStart=false ```xml diff --git a/examples/features/broker-connection/amqp-receiving-messages/pom.xml b/examples/features/broker-connection/amqp-receiving-messages/pom.xml index e9f0c8f448..1cf9c38705 100644 --- a/examples/features/broker-connection/amqp-receiving-messages/pom.xml +++ b/examples/features/broker-connection/amqp-receiving-messages/pom.xml @@ -29,7 +29,7 @@ under the License. amqp-receiving-messages jar - ActiveMQ Artemis JMS Client Side Load Balancing Example + amqp-receiving-messages ${project.basedir}/../../../.. @@ -103,7 +103,7 @@ under the License. true ${noServer} ${basedir}/target/server0 - tcp://localhost:5671 + tcp://localhost:5660 run diff --git a/examples/features/broker-connection/amqp-receiving-messages/readme.md b/examples/features/broker-connection/amqp-receiving-messages/readme.md index 6cf4de1564..8b08677ef6 100644 --- a/examples/features/broker-connection/amqp-receiving-messages/readme.md +++ b/examples/features/broker-connection/amqp-receiving-messages/readme.md @@ -1,4 +1,4 @@ -# AMQP Broker Connection with Senders +# AMQP Broker Connection with Receivers To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to create and start the broker manually. diff --git a/examples/features/broker-connection/amqp-receiving-messages/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionReceiver.java b/examples/features/broker-connection/amqp-receiving-messages/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionReceiver.java index a1f1a1b5a4..eb1ed2754f 100644 --- a/examples/features/broker-connection/amqp-receiving-messages/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionReceiver.java +++ b/examples/features/broker-connection/amqp-receiving-messages/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionReceiver.java @@ -27,11 +27,8 @@ import javax.jms.TextMessage; import org.apache.qpid.jms.JmsConnectionFactory; /** - * This example demonstrates how sessions created from a single connection can be load - * balanced across the different nodes of the cluster. - *

- * In this example there are three nodes and we use a round-robin client side load-balancing - * policy. + * This example is demonstrating how messages are transferred from one broker towards another broker + * through the receiver operation on a AMQP Broker Connection. */ public class BrokerConnectionReceiver { @@ -60,7 +57,7 @@ public class BrokerConnectionReceiver { // Step 2. create a connection on server1, and receive a few messages. // the sender on the broker conneciton will take care of the transfer. Connection connectionOnServer1 = null; - ConnectionFactory connectionFactoryServer1 = new JmsConnectionFactory("amqp://localhost:5671"); + ConnectionFactory connectionFactoryServer1 = new JmsConnectionFactory("amqp://localhost:5660"); try { connectionOnServer1 = connectionFactoryServer1.createConnection(); diff --git a/examples/features/broker-connection/amqp-receiving-messages/src/main/resources/activemq/server0/broker.xml b/examples/features/broker-connection/amqp-receiving-messages/src/main/resources/activemq/server0/broker.xml index e499a12ff7..db04d874c6 100644 --- a/examples/features/broker-connection/amqp-receiving-messages/src/main/resources/activemq/server0/broker.xml +++ b/examples/features/broker-connection/amqp-receiving-messages/src/main/resources/activemq/server0/broker.xml @@ -47,7 +47,7 @@ under the License. - tcp://0.0.0.0:5671?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true + tcp://0.0.0.0:5660?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true diff --git a/examples/features/broker-connection/amqp-sending-messages/pom.xml b/examples/features/broker-connection/amqp-sending-messages/pom.xml index f728babc7b..3960f124e5 100644 --- a/examples/features/broker-connection/amqp-sending-messages/pom.xml +++ b/examples/features/broker-connection/amqp-sending-messages/pom.xml @@ -29,7 +29,7 @@ under the License. amqp-sending-messages jar - ActiveMQ Artemis JMS Client Side Load Balancing Example + amqp-sending-messages ${project.basedir}/../../../.. @@ -87,7 +87,7 @@ under the License. ${noServer} true ${basedir}/target/server1 - tcp://localhost:5672 + tcp://localhost:5771 run @@ -103,7 +103,7 @@ under the License. true ${noServer} ${basedir}/target/server0 - tcp://localhost:5671 + tcp://localhost:5660 run diff --git a/examples/features/broker-connection/amqp-sending-messages/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionSender.java b/examples/features/broker-connection/amqp-sending-messages/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionSender.java index 3407aac7ac..22fabf7ef6 100644 --- a/examples/features/broker-connection/amqp-sending-messages/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionSender.java +++ b/examples/features/broker-connection/amqp-sending-messages/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionSender.java @@ -28,13 +28,13 @@ import org.apache.qpid.jms.JmsConnectionFactory; /** * This example is demonstrating how messages are transferred from one broker towards another broker - * through the sender element on a AMQP Broker Connection. + * through the sender operation on a AMQP Broker Connection. */ public class BrokerConnectionSender { public static void main(final String[] args) throws Exception { Connection connectionOnServer0 = null; - ConnectionFactory connectionFactoryServer0 = new JmsConnectionFactory("amqp://localhost:5671"); + ConnectionFactory connectionFactoryServer0 = new JmsConnectionFactory("amqp://localhost:5660"); // Step 1. Create a connection on server0, and send a few messages try { diff --git a/examples/features/broker-connection/amqp-sending-messages/src/main/resources/activemq/server0/broker.xml b/examples/features/broker-connection/amqp-sending-messages/src/main/resources/activemq/server0/broker.xml index f9fc84284a..0e26e63596 100644 --- a/examples/features/broker-connection/amqp-sending-messages/src/main/resources/activemq/server0/broker.xml +++ b/examples/features/broker-connection/amqp-sending-messages/src/main/resources/activemq/server0/broker.xml @@ -47,7 +47,7 @@ under the License. - tcp://0.0.0.0:5671?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true + tcp://0.0.0.0:5660?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true diff --git a/examples/features/broker-connection/amqp-sending-overssl/pom.xml b/examples/features/broker-connection/amqp-sending-overssl/pom.xml index 507fa10dcd..c1dc3b83ca 100644 --- a/examples/features/broker-connection/amqp-sending-overssl/pom.xml +++ b/examples/features/broker-connection/amqp-sending-overssl/pom.xml @@ -29,7 +29,7 @@ under the License. amqp-ssl-enabled jar - Connecting to another broker over SSL + amqp-ssl-enabled ${project.basedir}/../../../.. @@ -102,7 +102,7 @@ under the License. true ${noServer} ${basedir}/target/server0 - tcp://localhost:5671 + tcp://localhost:5660 run diff --git a/examples/features/broker-connection/amqp-sending-overssl/readme.md b/examples/features/broker-connection/amqp-sending-overssl/readme.md index 6038f6da90..322ccd3cbd 100644 --- a/examples/features/broker-connection/amqp-sending-overssl/readme.md +++ b/examples/features/broker-connection/amqp-sending-overssl/readme.md @@ -1,15 +1,9 @@ -# JMS SSL Example - -To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to start and create the broker manually. - -This example shows you how to configure SSL with ActiveMQ Artemis to send and receive message. - -Using SSL can make your messaging applications interact with ActiveMQ Artemis securely. An application can be secured transparently without extra coding effort. To secure your messaging application with SSL, you need to configure connector and acceptor as follows: - - tcp://localhost:5500?sslEnabled=true;keyStorePath=activemq.example.keystore;keyStorePassword=activemqexample - -In the configuration, the `activemq.example.keystore` is the key store file holding the server's certificate. The `activemq.example.truststore` is the file holding the certificates which the client trusts (i.e. the server's certificate exported from activemq.example.keystore). They are generated via the following commands: - -* `keytool -genkey -keystore activemq.example.keystore -storepass activemqexample -keypass activemqexample -dname "CN=ActiveMQ Artemis Server, OU=Artemis, O=ActiveMQ, L=AMQ, S=AMQ, C=AMQ" -keyalg RSA` -* `keytool -export -keystore activemq.example.keystore -file server-side-cert.cer -storepass activemqexample` -* `keytool -import -keystore activemq.example.truststore -file server-side-cert.cer -storepass activemqexample -keypass activemqexample -noprompt` \ No newline at end of file +# AMQP Broker Connection with Senders and SSL + +To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to create and start the broker manually. + +This example demonstrates how you can create a broker connection from one broker towards another broker, and send messages from that broker towards the target server. + +You basically configured the broker connection on broker.xml and this example will give you two working servers where you send messages in one broker and receive it on another broker. + +The Broker connection on this example is configured to use SSL. The client connections here are using regular connections. diff --git a/examples/features/broker-connection/amqp-sending-overssl/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionSenderSSL.java b/examples/features/broker-connection/amqp-sending-overssl/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionSenderSSL.java index a1bf376120..bc0ab978f0 100644 --- a/examples/features/broker-connection/amqp-sending-overssl/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionSenderSSL.java +++ b/examples/features/broker-connection/amqp-sending-overssl/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionSenderSSL.java @@ -29,12 +29,13 @@ import org.apache.qpid.jms.JmsConnectionFactory; /** * This example is demonstrating how messages are transferred from one broker towards another broker * through the sender element on a AMQP Broker Connection. + * The sender operation on the broker connection will use a SSL connection. */ public class BrokerConnectionSenderSSL { public static void main(final String[] args) throws Exception { Connection connectionOnServer0 = null; - ConnectionFactory connectionFactoryServer0 = new JmsConnectionFactory("amqp://localhost:5671"); + ConnectionFactory connectionFactoryServer0 = new JmsConnectionFactory("amqp://localhost:5660"); // Step 1. Create a connection on server0, and send a few messages try { diff --git a/examples/features/broker-connection/amqp-sending-overssl/src/main/resources/activemq/server0/broker.xml b/examples/features/broker-connection/amqp-sending-overssl/src/main/resources/activemq/server0/broker.xml index b2e9e7ef5a..8aea83cf33 100644 --- a/examples/features/broker-connection/amqp-sending-overssl/src/main/resources/activemq/server0/broker.xml +++ b/examples/features/broker-connection/amqp-sending-overssl/src/main/resources/activemq/server0/broker.xml @@ -31,7 +31,7 @@ under the License. - tcp://0.0.0.0:5671?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true + tcp://0.0.0.0:5660?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java index 93a4137a60..100e1c957b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java @@ -37,6 +37,7 @@ import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.ExecuteUtil; +import org.jboss.logging.Logger; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -46,6 +47,7 @@ import org.junit.Test; /** This test will only be executed if you have qdrouterd available on your system, otherwise is ignored by an assume exception. */ public class QpidDispatchPeerTest extends AmqpClientTestSupport { + private static final Logger logger = Logger.getLogger(QpidDispatchPeerTest.class); ExecuteUtil.ProcessHolder qpidProcess; @@ -58,8 +60,8 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { int result = ExecuteUtil.runCommand(true, "qdrouterd", "--version"); Assume.assumeTrue("qdrouterd does not exist", result == 0); } catch (Exception e) { - e.printStackTrace(); - Assume.assumeNoException(e); + logger.debug(e.getMessage(), e); + Assume.assumeNoException("qdrouterd does not exist", e); } }