This closes #3317
This commit is contained in:
commit
4f8afd5dd1
|
@ -466,8 +466,8 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
|
||||
RoutingType routingType = null;
|
||||
if (address != null) {
|
||||
// set Fixed-address producer if the message.properties.to address differs from the producer
|
||||
if (!address.toString().equals(message.getAddress())) {
|
||||
// set Fixed-address producer if the message.properties.to address differs from the producer
|
||||
message.setAddress(address);
|
||||
}
|
||||
routingType = context.getDefRoutingType();
|
||||
|
|
|
@ -198,13 +198,6 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
|
|||
return internalConnectionEntry(remotingConnection, true, saslFactory);
|
||||
}
|
||||
|
||||
/**
|
||||
* AMQP is an agnostic protocol, client and server.
|
||||
* This method is used also by the AMQPConenctionBridge where there is no acceptor in place.
|
||||
* So, this method is to be used by the AMQPConnectionBridge
|
||||
* @param remotingConnection
|
||||
* @return
|
||||
*/
|
||||
private ConnectionEntry internalConnectionEntry(Connection remotingConnection, boolean outgoing, ClientSASLFactory saslFactory) {
|
||||
AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(this, remotingConnection, server.getExecutorFactory().getExecutor(), server);
|
||||
long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL;
|
||||
|
|
|
@ -137,6 +137,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
|||
|
||||
@Override
|
||||
public void stop() {
|
||||
started = false;
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
|
@ -145,7 +146,6 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
|||
if (scheduledFuture != null) {
|
||||
scheduledFuture.cancel(true);
|
||||
}
|
||||
started = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -246,14 +246,11 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
|||
protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection;
|
||||
connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler()));
|
||||
|
||||
protonRemotingConnection.getAmqpConnection().runLater(() -> {
|
||||
protonRemotingConnection.getAmqpConnection().open();
|
||||
protonRemotingConnection.getAmqpConnection().flush();
|
||||
});
|
||||
|
||||
session = protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session();
|
||||
sessionContext = protonRemotingConnection.getAmqpConnection().getSessionExtension(session);
|
||||
|
||||
protonRemotingConnection.getAmqpConnection().runLater(() -> {
|
||||
protonRemotingConnection.getAmqpConnection().open();
|
||||
session.open();
|
||||
protonRemotingConnection.getAmqpConnection().flush();
|
||||
});
|
||||
|
@ -306,6 +303,10 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
|||
}
|
||||
}
|
||||
|
||||
private static void uninstallMirrorController(AMQPMirrorBrokerConnectionElement replicaConfig, ActiveMQServer server) {
|
||||
|
||||
}
|
||||
|
||||
/** The reason this method is static is the following:
|
||||
*
|
||||
* It is returning the snfQueue to the replica, and I needed isolation from the actual instance.
|
||||
|
@ -388,7 +389,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
|||
return;
|
||||
}
|
||||
receivers.add(queue);
|
||||
Receiver receiver = session.receiver(queue.getName().toString() + UUIDGenerator.getInstance().generateStringUUID());
|
||||
Receiver receiver = session.receiver(queue.getAddress().toString() + ":" + UUIDGenerator.getInstance().generateStringUUID());
|
||||
receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
|
||||
receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
|
||||
Target target = new Target();
|
||||
|
@ -433,11 +434,11 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
|||
return;
|
||||
}
|
||||
senders.add(queue);
|
||||
Sender sender = session.sender(targetName + UUIDGenerator.getInstance().generateStringUUID());
|
||||
Sender sender = session.sender(targetName + ":" + UUIDGenerator.getInstance().generateStringUUID());
|
||||
sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
|
||||
sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
|
||||
Target target = new Target();
|
||||
target.setAddress(queue.getAddress().toString());
|
||||
target.setAddress(targetName);
|
||||
if (capabilities != null) {
|
||||
target.setCapabilities(capabilities);
|
||||
}
|
||||
|
@ -484,7 +485,6 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
|||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
// TODO implement close
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.activemq.artemis.protocol.amqp.connect;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
|
@ -49,7 +50,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
|
|||
private volatile boolean started = false;
|
||||
|
||||
List<AMQPBrokerConnectConfiguration> amqpConnectionsConfig;
|
||||
List<AMQPBrokerConnection> amqpOutgoingConnections;
|
||||
List<AMQPBrokerConnection> amqpBrokerConnectionList;
|
||||
ProtonProtocolManager protonProtocolManager;
|
||||
|
||||
public AMQPBrokerConnectionManager(ProtonProtocolManagerFactory factory, List<AMQPBrokerConnectConfiguration> amqpConnectionsConfig, ActiveMQServer server) {
|
||||
|
@ -64,6 +65,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
|
|||
started = true;
|
||||
}
|
||||
|
||||
amqpBrokerConnectionList = new ArrayList<>();
|
||||
|
||||
|
||||
for (AMQPBrokerConnectConfiguration config : amqpConnectionsConfig) {
|
||||
|
@ -76,6 +78,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
|
|||
logger.debug("Connecting " + config);
|
||||
}
|
||||
AMQPBrokerConnection amqpBrokerConnection = new AMQPBrokerConnection(this, config, protonProtocolManager, server, bridgesConnector);
|
||||
amqpBrokerConnectionList.add(amqpBrokerConnection);
|
||||
server.registerBrokerConnection(amqpBrokerConnection);
|
||||
if (config.isAutostart()) {
|
||||
amqpBrokerConnection.start();
|
||||
|
@ -90,10 +93,8 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
|
|||
public void stop() throws Exception {
|
||||
if (started) {
|
||||
started = false;
|
||||
if (amqpOutgoingConnections != null) {
|
||||
for (AMQPBrokerConnection connection : amqpOutgoingConnections) {
|
||||
connection.stop();
|
||||
}
|
||||
for (AMQPBrokerConnection connection : amqpBrokerConnectionList) {
|
||||
connection.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -110,8 +111,8 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
|
|||
|
||||
@Override
|
||||
public void connectionDestroyed(Object connectionID) {
|
||||
for (AMQPBrokerConnection connection : amqpOutgoingConnections) {
|
||||
if (connection.getConnection().getID().equals(connectionID)) {
|
||||
for (AMQPBrokerConnection connection : amqpBrokerConnectionList) {
|
||||
if (connection.getConnection() != null && connectionID.equals(connection.getConnection().getID())) {
|
||||
connection.connectionDestroyed(connectionID);
|
||||
}
|
||||
}
|
||||
|
@ -119,8 +120,8 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
|
|||
|
||||
@Override
|
||||
public void connectionException(Object connectionID, ActiveMQException me) {
|
||||
for (AMQPBrokerConnection connection : amqpOutgoingConnections) {
|
||||
if (connection.getConnection().getID().equals(connectionID)) {
|
||||
for (AMQPBrokerConnection connection : amqpBrokerConnectionList) {
|
||||
if (connection.getConnection() != null && connectionID.equals(connection.getConnection().getID())) {
|
||||
connection.connectionException(connectionID, me);
|
||||
}
|
||||
}
|
||||
|
@ -129,7 +130,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
|
|||
|
||||
@Override
|
||||
public void connectionReadyForWrites(Object connectionID, boolean ready) {
|
||||
for (AMQPBrokerConnection connection : amqpOutgoingConnections) {
|
||||
for (AMQPBrokerConnection connection : amqpBrokerConnectionList) {
|
||||
if (connection.getConnection().getID().equals(connectionID)) {
|
||||
connection.connectionReadyForWrites(connectionID, ready);
|
||||
}
|
||||
|
|
|
@ -54,8 +54,8 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom
|
|||
public static final Symbol DELETE_ADDRESS = Symbol.getSymbol("deleteAddress");
|
||||
public static final Symbol CREATE_QUEUE = Symbol.getSymbol("createQueue");
|
||||
public static final Symbol DELETE_QUEUE = Symbol.getSymbol("deleteQueue");
|
||||
public static final Symbol ADDRESS_SCAN_START = Symbol.getSymbol("AddressCanStart");
|
||||
public static final Symbol ADDRESS_SCAN_END = Symbol.getSymbol("AddressScanEnd");
|
||||
public static final Symbol ADDRESS_SCAN_START = Symbol.getSymbol("addressCanStart");
|
||||
public static final Symbol ADDRESS_SCAN_END = Symbol.getSymbol("addressScanEnd");
|
||||
public static final Symbol POST_ACK = Symbol.getSymbol("postAck");
|
||||
|
||||
// Delivery annotation property used on mirror control routing and Ack
|
||||
|
@ -141,16 +141,16 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom
|
|||
|
||||
snfQueue.refUp(ref);
|
||||
|
||||
Map<Symbol, Object> symbolObjectMap = new HashMap<>();
|
||||
DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(symbolObjectMap);
|
||||
symbolObjectMap.put(INTERNAL_ID, message.getMessageID());
|
||||
Map<Symbol, Object> daMap = new HashMap<>();
|
||||
DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(daMap);
|
||||
daMap.put(INTERNAL_ID, message.getMessageID());
|
||||
String address = message.getAddress();
|
||||
if (address != null) { // this is the message that was set through routing
|
||||
Properties amqpProperties = getProperties(message);
|
||||
if (amqpProperties == null || !address.equals(amqpProperties.getTo())) {
|
||||
// We set the internal destination property only if we need to
|
||||
// otherwise we just use the one already set over Properties
|
||||
symbolObjectMap.put(INTERNAL_DESTINATION, message.getAddress());
|
||||
daMap.put(INTERNAL_DESTINATION, message.getAddress());
|
||||
}
|
||||
}
|
||||
ref.setProtocolData(deliveryAnnotations);
|
||||
|
|
|
@ -362,7 +362,6 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
|
|||
|
||||
@Override
|
||||
public void setOwner(PagingStore owner) {
|
||||
new Exception("Setting owner as " + owner.getStoreName()).printStackTrace();
|
||||
this.owner = owner;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -70,24 +70,24 @@ Some examples are shown below.
|
|||
Using address expressions:
|
||||
```xml
|
||||
<broker-connections>
|
||||
<amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
|
||||
<sender match="queues.#"/>
|
||||
<!-- notice the local queues for remotequeues.# need to be created on this broker -->
|
||||
<receiver match="remotequeues.#"/>
|
||||
</amqp-connection>
|
||||
<amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
|
||||
<sender match="queues.#"/>
|
||||
<!-- notice the local queues for remotequeues.# need to be created on this broker -->
|
||||
<receiver match="remotequeues.#"/>
|
||||
</amqp-connection>
|
||||
</broker-connections>
|
||||
|
||||
<addresses>
|
||||
<address name="remotequeues.A">
|
||||
<anycast>
|
||||
<queue name="remoteQueueA"/>
|
||||
</anycast>
|
||||
</address>
|
||||
<address name="queues.B">
|
||||
<anycast>
|
||||
<queue name="localQueueB"/>
|
||||
</anycast>
|
||||
</address>
|
||||
<address name="remotequeues.A">
|
||||
<anycast>
|
||||
<queue name="remoteQueueA"/>
|
||||
</anycast>
|
||||
</address>
|
||||
<address name="queues.B">
|
||||
<anycast>
|
||||
<queue name="localQueueB"/>
|
||||
</anycast>
|
||||
</address>
|
||||
</addresses>
|
||||
```
|
||||
|
||||
|
@ -122,14 +122,16 @@ Using queue names:
|
|||
# Peers
|
||||
A peer broker connection element is a combination of sender and receivers. The ActiveMQ Artemis broker creates both a sender and a receiver for a peer element, and the endpoint knows how to deal with the pair without creating an infinite loop of sending and receiving messages.
|
||||
|
||||
Currently, [Apache Qpid Dispatch Router](https://qpid.apache.org/components/dispatch-router/index.html) is a peer. ActiveMQ Artemis creates the pair of receivers and sender for each matching destination. These senders and receivers have special configuration to let Qpid Dispatch Router know to collaborate with ActiveMQ Artemis.
|
||||
Currently, [Apache Qpid Dispatch Router](https://qpid.apache.org/components/dispatch-router/index.html) can act as a peer. ActiveMQ Artemis creates the pair of receivers and sender for each matching destination. These senders and receivers have special configuration to let Qpid Dispatch Router know to collaborate with ActiveMQ Artemis.
|
||||
|
||||
You can experiment with advanced networking scenarios with Qpid Dispatch Router and get a lot of benefit from the AMQP protocol and its ecosystem.
|
||||
|
||||
When you add this option, the broker will act as a way point on Qpid Dispatch, and the broker will act as a store point for the messages. For more information refer to the documentation on [Apache Qpid Dispatch Router](https://qpid.apache.org/components/dispatch-router/index.html).
|
||||
|
||||
With a peer, you have the same properties that you have on a sender and receiver. For example:
|
||||
```xml
|
||||
<broker-connections>
|
||||
<amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
|
||||
<amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-router">
|
||||
<peer match="queues.#"/>
|
||||
</amqp-connection>
|
||||
</broker-connections>
|
||||
|
@ -137,12 +139,12 @@ With a peer, you have the same properties that you have on a sender and receiver
|
|||
<addresses>
|
||||
<address name="queues.A">
|
||||
<anycast>
|
||||
<queue name="localQueueA"/>
|
||||
<queue name="queues.A"/>
|
||||
</anycast>
|
||||
</address>
|
||||
<address name="queues.B">
|
||||
<anycast>
|
||||
<queue name="localQueueB"/>
|
||||
<queue name="queues.B"/>
|
||||
</anycast>
|
||||
</address>
|
||||
</addresses>
|
||||
|
@ -180,8 +182,15 @@ An example of a mirror configuration is shown below:
|
|||
</broker-connections>
|
||||
```
|
||||
|
||||
## Catch up on Mirror
|
||||
The broker will not send past events over the mirror. As the broker sends and receives messages, only a natural catch up would eventually happen.
|
||||
*Important*: One broker can have multiple replicas (1 to many). However a replica site can only have a single source. Make sure you do not connect multiple replicas to a single mirror.
|
||||
|
||||
## Pre existing messages
|
||||
The broker will not send pre existing messages through the mirror. So, If you add mirror to your configuration and the journal had pre existing messages these messages will not be sent.
|
||||
|
||||
## Broker Connection Stop and Disconnect
|
||||
Once you start the broker connection with a mirror the mirror events will always be sent to the temporary queue configured at the `source-mirror-address`.
|
||||
|
||||
It is possible to stop the broker connection with the operation stopBrokerConnection(connectionName) on the ServerControl, but it is only effective to disconnect the brokers, while the mirror events are always captured.
|
||||
|
||||
## Disaster & Recovery considerations
|
||||
As you use the mirror option to replicate data across datacenters, you have to take a few considerations:
|
||||
|
@ -195,8 +204,7 @@ As you use the mirror option to replicate data across datacenters, you have to t
|
|||
* You can have a disabled broker connection to be enabled after the disaster.
|
||||
|
||||
|
||||
## Mirror example sending acknowledgements
|
||||
|
||||
## Mirror example with Failback
|
||||
On this example lets play with two brokers:
|
||||
- sourceBroker
|
||||
- replicaBroker
|
||||
|
@ -211,7 +219,7 @@ Add this configuration on sourceBroker:
|
|||
</broker-connections>
|
||||
```
|
||||
|
||||
On the replicaBroker, add disabled broker connection for failing back after a disaster, and also set the acceptors with autoStart=false
|
||||
On the replicaBroker, add a disabled broker connection for failing back after a disaster, and also set the acceptors with autoStart=false
|
||||
|
||||
```xml
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ under the License.
|
|||
|
||||
<artifactId>amqp-receiving-messages</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>ActiveMQ Artemis JMS Client Side Load Balancing Example</name>
|
||||
<name>amqp-receiving-messages</name>
|
||||
|
||||
<properties>
|
||||
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
|
||||
|
@ -103,7 +103,7 @@ under the License.
|
|||
<spawn>true</spawn>
|
||||
<ignore>${noServer}</ignore>
|
||||
<location>${basedir}/target/server0</location>
|
||||
<testURI>tcp://localhost:5671</testURI>
|
||||
<testURI>tcp://localhost:5660</testURI>
|
||||
<args>
|
||||
<param>run</param>
|
||||
</args>
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
# AMQP Broker Connection with Senders
|
||||
# AMQP Broker Connection with Receivers
|
||||
|
||||
To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to create and start the broker manually.
|
||||
|
||||
|
|
|
@ -27,11 +27,8 @@ import javax.jms.TextMessage;
|
|||
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||
|
||||
/**
|
||||
* This example demonstrates how sessions created from a single connection can be load
|
||||
* balanced across the different nodes of the cluster.
|
||||
* <p>
|
||||
* In this example there are three nodes and we use a round-robin client side load-balancing
|
||||
* policy.
|
||||
* This example is demonstrating how messages are transferred from one broker towards another broker
|
||||
* through the receiver operation on a AMQP Broker Connection.
|
||||
*/
|
||||
public class BrokerConnectionReceiver {
|
||||
|
||||
|
@ -60,7 +57,7 @@ public class BrokerConnectionReceiver {
|
|||
// Step 2. create a connection on server1, and receive a few messages.
|
||||
// the sender on the broker conneciton will take care of the transfer.
|
||||
Connection connectionOnServer1 = null;
|
||||
ConnectionFactory connectionFactoryServer1 = new JmsConnectionFactory("amqp://localhost:5671");
|
||||
ConnectionFactory connectionFactoryServer1 = new JmsConnectionFactory("amqp://localhost:5660");
|
||||
|
||||
try {
|
||||
connectionOnServer1 = connectionFactoryServer1.createConnection();
|
||||
|
|
|
@ -47,7 +47,7 @@ under the License.
|
|||
|
||||
<acceptors>
|
||||
<!-- Acceptor for every supported protocol -->
|
||||
<acceptor name="artemis">tcp://0.0.0.0:5671?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
|
||||
<acceptor name="artemis">tcp://0.0.0.0:5660?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
|
||||
|
||||
</acceptors>
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ under the License.
|
|||
|
||||
<artifactId>amqp-sending-messages</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>ActiveMQ Artemis JMS Client Side Load Balancing Example</name>
|
||||
<name>amqp-sending-messages</name>
|
||||
|
||||
<properties>
|
||||
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
|
||||
|
@ -87,7 +87,7 @@ under the License.
|
|||
<ignore>${noServer}</ignore>
|
||||
<spawn>true</spawn>
|
||||
<location>${basedir}/target/server1</location>
|
||||
<testURI>tcp://localhost:5672</testURI>
|
||||
<testURI>tcp://localhost:5771</testURI>
|
||||
<args>
|
||||
<param>run</param>
|
||||
</args>
|
||||
|
@ -103,7 +103,7 @@ under the License.
|
|||
<spawn>true</spawn>
|
||||
<ignore>${noServer}</ignore>
|
||||
<location>${basedir}/target/server0</location>
|
||||
<testURI>tcp://localhost:5671</testURI>
|
||||
<testURI>tcp://localhost:5660</testURI>
|
||||
<args>
|
||||
<param>run</param>
|
||||
</args>
|
||||
|
|
|
@ -28,13 +28,13 @@ import org.apache.qpid.jms.JmsConnectionFactory;
|
|||
|
||||
/**
|
||||
* This example is demonstrating how messages are transferred from one broker towards another broker
|
||||
* through the sender element on a AMQP Broker Connection.
|
||||
* through the sender operation on a AMQP Broker Connection.
|
||||
*/
|
||||
public class BrokerConnectionSender {
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
Connection connectionOnServer0 = null;
|
||||
ConnectionFactory connectionFactoryServer0 = new JmsConnectionFactory("amqp://localhost:5671");
|
||||
ConnectionFactory connectionFactoryServer0 = new JmsConnectionFactory("amqp://localhost:5660");
|
||||
|
||||
// Step 1. Create a connection on server0, and send a few messages
|
||||
try {
|
||||
|
|
|
@ -47,7 +47,7 @@ under the License.
|
|||
|
||||
<acceptors>
|
||||
<!-- Acceptor for every supported protocol -->
|
||||
<acceptor name="artemis">tcp://0.0.0.0:5671?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
|
||||
<acceptor name="artemis">tcp://0.0.0.0:5660?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
|
||||
|
||||
</acceptors>
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ under the License.
|
|||
|
||||
<artifactId>amqp-ssl-enabled</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>Connecting to another broker over SSL</name>
|
||||
<name>amqp-ssl-enabled</name>
|
||||
|
||||
<properties>
|
||||
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
|
||||
|
@ -102,7 +102,7 @@ under the License.
|
|||
<spawn>true</spawn>
|
||||
<ignore>${noServer}</ignore>
|
||||
<location>${basedir}/target/server0</location>
|
||||
<testURI>tcp://localhost:5671</testURI>
|
||||
<testURI>tcp://localhost:5660</testURI>
|
||||
<args>
|
||||
<param>run</param>
|
||||
</args>
|
||||
|
|
|
@ -1,15 +1,9 @@
|
|||
# JMS SSL Example
|
||||
|
||||
To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to start and create the broker manually.
|
||||
|
||||
This example shows you how to configure SSL with ActiveMQ Artemis to send and receive message.
|
||||
|
||||
Using SSL can make your messaging applications interact with ActiveMQ Artemis securely. An application can be secured transparently without extra coding effort. To secure your messaging application with SSL, you need to configure connector and acceptor as follows:
|
||||
|
||||
<acceptor name="netty-ssl-acceptor">tcp://localhost:5500?sslEnabled=true;keyStorePath=activemq.example.keystore;keyStorePassword=activemqexample</acceptor>
|
||||
|
||||
In the configuration, the `activemq.example.keystore` is the key store file holding the server's certificate. The `activemq.example.truststore` is the file holding the certificates which the client trusts (i.e. the server's certificate exported from activemq.example.keystore). They are generated via the following commands:
|
||||
|
||||
* `keytool -genkey -keystore activemq.example.keystore -storepass activemqexample -keypass activemqexample -dname "CN=ActiveMQ Artemis Server, OU=Artemis, O=ActiveMQ, L=AMQ, S=AMQ, C=AMQ" -keyalg RSA`
|
||||
* `keytool -export -keystore activemq.example.keystore -file server-side-cert.cer -storepass activemqexample`
|
||||
* `keytool -import -keystore activemq.example.truststore -file server-side-cert.cer -storepass activemqexample -keypass activemqexample -noprompt`
|
||||
# AMQP Broker Connection with Senders and SSL
|
||||
|
||||
To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to create and start the broker manually.
|
||||
|
||||
This example demonstrates how you can create a broker connection from one broker towards another broker, and send messages from that broker towards the target server.
|
||||
|
||||
You basically configured the broker connection on broker.xml and this example will give you two working servers where you send messages in one broker and receive it on another broker.
|
||||
|
||||
The Broker connection on this example is configured to use SSL. The client connections here are using regular connections.
|
||||
|
|
|
@ -29,12 +29,13 @@ import org.apache.qpid.jms.JmsConnectionFactory;
|
|||
/**
|
||||
* This example is demonstrating how messages are transferred from one broker towards another broker
|
||||
* through the sender element on a AMQP Broker Connection.
|
||||
* The sender operation on the broker connection will use a SSL connection.
|
||||
*/
|
||||
public class BrokerConnectionSenderSSL {
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
Connection connectionOnServer0 = null;
|
||||
ConnectionFactory connectionFactoryServer0 = new JmsConnectionFactory("amqp://localhost:5671");
|
||||
ConnectionFactory connectionFactoryServer0 = new JmsConnectionFactory("amqp://localhost:5660");
|
||||
|
||||
// Step 1. Create a connection on server0, and send a few messages
|
||||
try {
|
||||
|
|
|
@ -31,7 +31,7 @@ under the License.
|
|||
<!-- Acceptors -->
|
||||
<acceptors>
|
||||
<!-- keystores will be found automatically if they are on the classpath -->
|
||||
<acceptor name="artemis">tcp://0.0.0.0:5671?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
|
||||
<acceptor name="artemis">tcp://0.0.0.0:5660?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
|
||||
</acceptors>
|
||||
|
||||
<broker-connections>
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
|
|||
import org.apache.activemq.artemis.tests.util.CFUtil;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.artemis.utils.ExecuteUtil;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
|
@ -46,6 +47,7 @@ import org.junit.Test;
|
|||
|
||||
/** This test will only be executed if you have qdrouterd available on your system, otherwise is ignored by an assume exception. */
|
||||
public class QpidDispatchPeerTest extends AmqpClientTestSupport {
|
||||
private static final Logger logger = Logger.getLogger(QpidDispatchPeerTest.class);
|
||||
|
||||
ExecuteUtil.ProcessHolder qpidProcess;
|
||||
|
||||
|
@ -58,8 +60,8 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
|
|||
int result = ExecuteUtil.runCommand(true, "qdrouterd", "--version");
|
||||
Assume.assumeTrue("qdrouterd does not exist", result == 0);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
Assume.assumeNoException(e);
|
||||
logger.debug(e.getMessage(), e);
|
||||
Assume.assumeNoException("qdrouterd does not exist", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue