ARTEMIS-2937 Fixing Tests and some review
This commit is contained in:
parent
05d5db83a4
commit
bf52134dc0
|
@ -466,8 +466,8 @@ public class AMQPSessionCallback implements SessionCallback {
|
||||||
|
|
||||||
RoutingType routingType = null;
|
RoutingType routingType = null;
|
||||||
if (address != null) {
|
if (address != null) {
|
||||||
|
// set Fixed-address producer if the message.properties.to address differs from the producer
|
||||||
if (!address.toString().equals(message.getAddress())) {
|
if (!address.toString().equals(message.getAddress())) {
|
||||||
// set Fixed-address producer if the message.properties.to address differs from the producer
|
|
||||||
message.setAddress(address);
|
message.setAddress(address);
|
||||||
}
|
}
|
||||||
routingType = context.getDefRoutingType();
|
routingType = context.getDefRoutingType();
|
||||||
|
|
|
@ -198,13 +198,6 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
|
||||||
return internalConnectionEntry(remotingConnection, true, saslFactory);
|
return internalConnectionEntry(remotingConnection, true, saslFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* AMQP is an agnostic protocol, client and server.
|
|
||||||
* This method is used also by the AMQPConenctionBridge where there is no acceptor in place.
|
|
||||||
* So, this method is to be used by the AMQPConnectionBridge
|
|
||||||
* @param remotingConnection
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
private ConnectionEntry internalConnectionEntry(Connection remotingConnection, boolean outgoing, ClientSASLFactory saslFactory) {
|
private ConnectionEntry internalConnectionEntry(Connection remotingConnection, boolean outgoing, ClientSASLFactory saslFactory) {
|
||||||
AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(this, remotingConnection, server.getExecutorFactory().getExecutor(), server);
|
AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(this, remotingConnection, server.getExecutorFactory().getExecutor(), server);
|
||||||
long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL;
|
long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL;
|
||||||
|
|
|
@ -137,6 +137,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop() {
|
public void stop() {
|
||||||
|
started = false;
|
||||||
if (connection != null) {
|
if (connection != null) {
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
@ -145,7 +146,6 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
||||||
if (scheduledFuture != null) {
|
if (scheduledFuture != null) {
|
||||||
scheduledFuture.cancel(true);
|
scheduledFuture.cancel(true);
|
||||||
}
|
}
|
||||||
started = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -246,14 +246,11 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
||||||
protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection;
|
protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection;
|
||||||
connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler()));
|
connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler()));
|
||||||
|
|
||||||
protonRemotingConnection.getAmqpConnection().runLater(() -> {
|
|
||||||
protonRemotingConnection.getAmqpConnection().open();
|
|
||||||
protonRemotingConnection.getAmqpConnection().flush();
|
|
||||||
});
|
|
||||||
|
|
||||||
session = protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session();
|
session = protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session();
|
||||||
sessionContext = protonRemotingConnection.getAmqpConnection().getSessionExtension(session);
|
sessionContext = protonRemotingConnection.getAmqpConnection().getSessionExtension(session);
|
||||||
|
|
||||||
protonRemotingConnection.getAmqpConnection().runLater(() -> {
|
protonRemotingConnection.getAmqpConnection().runLater(() -> {
|
||||||
|
protonRemotingConnection.getAmqpConnection().open();
|
||||||
session.open();
|
session.open();
|
||||||
protonRemotingConnection.getAmqpConnection().flush();
|
protonRemotingConnection.getAmqpConnection().flush();
|
||||||
});
|
});
|
||||||
|
@ -306,6 +303,10 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void uninstallMirrorController(AMQPMirrorBrokerConnectionElement replicaConfig, ActiveMQServer server) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/** The reason this method is static is the following:
|
/** The reason this method is static is the following:
|
||||||
*
|
*
|
||||||
* It is returning the snfQueue to the replica, and I needed isolation from the actual instance.
|
* It is returning the snfQueue to the replica, and I needed isolation from the actual instance.
|
||||||
|
@ -388,7 +389,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
receivers.add(queue);
|
receivers.add(queue);
|
||||||
Receiver receiver = session.receiver(queue.getName().toString() + UUIDGenerator.getInstance().generateStringUUID());
|
Receiver receiver = session.receiver(queue.getAddress().toString() + ":" + UUIDGenerator.getInstance().generateStringUUID());
|
||||||
receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
|
receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
|
||||||
receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
|
receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
|
||||||
Target target = new Target();
|
Target target = new Target();
|
||||||
|
@ -433,11 +434,11 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
senders.add(queue);
|
senders.add(queue);
|
||||||
Sender sender = session.sender(targetName + UUIDGenerator.getInstance().generateStringUUID());
|
Sender sender = session.sender(targetName + ":" + UUIDGenerator.getInstance().generateStringUUID());
|
||||||
sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
|
sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
|
||||||
sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
|
sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
|
||||||
Target target = new Target();
|
Target target = new Target();
|
||||||
target.setAddress(queue.getAddress().toString());
|
target.setAddress(targetName);
|
||||||
if (capabilities != null) {
|
if (capabilities != null) {
|
||||||
target.setCapabilities(capabilities);
|
target.setCapabilities(capabilities);
|
||||||
}
|
}
|
||||||
|
@ -484,7 +485,6 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws Exception {
|
public void close() throws Exception {
|
||||||
// TODO implement close
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
package org.apache.activemq.artemis.protocol.amqp.connect;
|
package org.apache.activemq.artemis.protocol.amqp.connect;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
|
@ -49,7 +50,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
|
||||||
private volatile boolean started = false;
|
private volatile boolean started = false;
|
||||||
|
|
||||||
List<AMQPBrokerConnectConfiguration> amqpConnectionsConfig;
|
List<AMQPBrokerConnectConfiguration> amqpConnectionsConfig;
|
||||||
List<AMQPBrokerConnection> amqpOutgoingConnections;
|
List<AMQPBrokerConnection> amqpBrokerConnectionList;
|
||||||
ProtonProtocolManager protonProtocolManager;
|
ProtonProtocolManager protonProtocolManager;
|
||||||
|
|
||||||
public AMQPBrokerConnectionManager(ProtonProtocolManagerFactory factory, List<AMQPBrokerConnectConfiguration> amqpConnectionsConfig, ActiveMQServer server) {
|
public AMQPBrokerConnectionManager(ProtonProtocolManagerFactory factory, List<AMQPBrokerConnectConfiguration> amqpConnectionsConfig, ActiveMQServer server) {
|
||||||
|
@ -64,6 +65,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
|
||||||
started = true;
|
started = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
amqpBrokerConnectionList = new ArrayList<>();
|
||||||
|
|
||||||
|
|
||||||
for (AMQPBrokerConnectConfiguration config : amqpConnectionsConfig) {
|
for (AMQPBrokerConnectConfiguration config : amqpConnectionsConfig) {
|
||||||
|
@ -76,6 +78,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
|
||||||
logger.debug("Connecting " + config);
|
logger.debug("Connecting " + config);
|
||||||
}
|
}
|
||||||
AMQPBrokerConnection amqpBrokerConnection = new AMQPBrokerConnection(this, config, protonProtocolManager, server, bridgesConnector);
|
AMQPBrokerConnection amqpBrokerConnection = new AMQPBrokerConnection(this, config, protonProtocolManager, server, bridgesConnector);
|
||||||
|
amqpBrokerConnectionList.add(amqpBrokerConnection);
|
||||||
server.registerBrokerConnection(amqpBrokerConnection);
|
server.registerBrokerConnection(amqpBrokerConnection);
|
||||||
if (config.isAutostart()) {
|
if (config.isAutostart()) {
|
||||||
amqpBrokerConnection.start();
|
amqpBrokerConnection.start();
|
||||||
|
@ -90,10 +93,8 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
|
||||||
public void stop() throws Exception {
|
public void stop() throws Exception {
|
||||||
if (started) {
|
if (started) {
|
||||||
started = false;
|
started = false;
|
||||||
if (amqpOutgoingConnections != null) {
|
for (AMQPBrokerConnection connection : amqpBrokerConnectionList) {
|
||||||
for (AMQPBrokerConnection connection : amqpOutgoingConnections) {
|
connection.stop();
|
||||||
connection.stop();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -110,8 +111,8 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void connectionDestroyed(Object connectionID) {
|
public void connectionDestroyed(Object connectionID) {
|
||||||
for (AMQPBrokerConnection connection : amqpOutgoingConnections) {
|
for (AMQPBrokerConnection connection : amqpBrokerConnectionList) {
|
||||||
if (connection.getConnection().getID().equals(connectionID)) {
|
if (connection.getConnection() != null && connectionID.equals(connection.getConnection().getID())) {
|
||||||
connection.connectionDestroyed(connectionID);
|
connection.connectionDestroyed(connectionID);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -119,8 +120,8 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void connectionException(Object connectionID, ActiveMQException me) {
|
public void connectionException(Object connectionID, ActiveMQException me) {
|
||||||
for (AMQPBrokerConnection connection : amqpOutgoingConnections) {
|
for (AMQPBrokerConnection connection : amqpBrokerConnectionList) {
|
||||||
if (connection.getConnection().getID().equals(connectionID)) {
|
if (connection.getConnection() != null && connectionID.equals(connection.getConnection().getID())) {
|
||||||
connection.connectionException(connectionID, me);
|
connection.connectionException(connectionID, me);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -129,7 +130,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void connectionReadyForWrites(Object connectionID, boolean ready) {
|
public void connectionReadyForWrites(Object connectionID, boolean ready) {
|
||||||
for (AMQPBrokerConnection connection : amqpOutgoingConnections) {
|
for (AMQPBrokerConnection connection : amqpBrokerConnectionList) {
|
||||||
if (connection.getConnection().getID().equals(connectionID)) {
|
if (connection.getConnection().getID().equals(connectionID)) {
|
||||||
connection.connectionReadyForWrites(connectionID, ready);
|
connection.connectionReadyForWrites(connectionID, ready);
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,8 +54,8 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom
|
||||||
public static final Symbol DELETE_ADDRESS = Symbol.getSymbol("deleteAddress");
|
public static final Symbol DELETE_ADDRESS = Symbol.getSymbol("deleteAddress");
|
||||||
public static final Symbol CREATE_QUEUE = Symbol.getSymbol("createQueue");
|
public static final Symbol CREATE_QUEUE = Symbol.getSymbol("createQueue");
|
||||||
public static final Symbol DELETE_QUEUE = Symbol.getSymbol("deleteQueue");
|
public static final Symbol DELETE_QUEUE = Symbol.getSymbol("deleteQueue");
|
||||||
public static final Symbol ADDRESS_SCAN_START = Symbol.getSymbol("AddressCanStart");
|
public static final Symbol ADDRESS_SCAN_START = Symbol.getSymbol("addressCanStart");
|
||||||
public static final Symbol ADDRESS_SCAN_END = Symbol.getSymbol("AddressScanEnd");
|
public static final Symbol ADDRESS_SCAN_END = Symbol.getSymbol("addressScanEnd");
|
||||||
public static final Symbol POST_ACK = Symbol.getSymbol("postAck");
|
public static final Symbol POST_ACK = Symbol.getSymbol("postAck");
|
||||||
|
|
||||||
// Delivery annotation property used on mirror control routing and Ack
|
// Delivery annotation property used on mirror control routing and Ack
|
||||||
|
@ -141,16 +141,16 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom
|
||||||
|
|
||||||
snfQueue.refUp(ref);
|
snfQueue.refUp(ref);
|
||||||
|
|
||||||
Map<Symbol, Object> symbolObjectMap = new HashMap<>();
|
Map<Symbol, Object> daMap = new HashMap<>();
|
||||||
DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(symbolObjectMap);
|
DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(daMap);
|
||||||
symbolObjectMap.put(INTERNAL_ID, message.getMessageID());
|
daMap.put(INTERNAL_ID, message.getMessageID());
|
||||||
String address = message.getAddress();
|
String address = message.getAddress();
|
||||||
if (address != null) { // this is the message that was set through routing
|
if (address != null) { // this is the message that was set through routing
|
||||||
Properties amqpProperties = getProperties(message);
|
Properties amqpProperties = getProperties(message);
|
||||||
if (amqpProperties == null || !address.equals(amqpProperties.getTo())) {
|
if (amqpProperties == null || !address.equals(amqpProperties.getTo())) {
|
||||||
// We set the internal destination property only if we need to
|
// We set the internal destination property only if we need to
|
||||||
// otherwise we just use the one already set over Properties
|
// otherwise we just use the one already set over Properties
|
||||||
symbolObjectMap.put(INTERNAL_DESTINATION, message.getAddress());
|
daMap.put(INTERNAL_DESTINATION, message.getAddress());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ref.setProtocolData(deliveryAnnotations);
|
ref.setProtocolData(deliveryAnnotations);
|
||||||
|
|
|
@ -362,7 +362,6 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setOwner(PagingStore owner) {
|
public void setOwner(PagingStore owner) {
|
||||||
new Exception("Setting owner as " + owner.getStoreName()).printStackTrace();
|
|
||||||
this.owner = owner;
|
this.owner = owner;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,24 +70,24 @@ Some examples are shown below.
|
||||||
Using address expressions:
|
Using address expressions:
|
||||||
```xml
|
```xml
|
||||||
<broker-connections>
|
<broker-connections>
|
||||||
<amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
|
<amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
|
||||||
<sender match="queues.#"/>
|
<sender match="queues.#"/>
|
||||||
<!-- notice the local queues for remotequeues.# need to be created on this broker -->
|
<!-- notice the local queues for remotequeues.# need to be created on this broker -->
|
||||||
<receiver match="remotequeues.#"/>
|
<receiver match="remotequeues.#"/>
|
||||||
</amqp-connection>
|
</amqp-connection>
|
||||||
</broker-connections>
|
</broker-connections>
|
||||||
|
|
||||||
<addresses>
|
<addresses>
|
||||||
<address name="remotequeues.A">
|
<address name="remotequeues.A">
|
||||||
<anycast>
|
<anycast>
|
||||||
<queue name="remoteQueueA"/>
|
<queue name="remoteQueueA"/>
|
||||||
</anycast>
|
</anycast>
|
||||||
</address>
|
</address>
|
||||||
<address name="queues.B">
|
<address name="queues.B">
|
||||||
<anycast>
|
<anycast>
|
||||||
<queue name="localQueueB"/>
|
<queue name="localQueueB"/>
|
||||||
</anycast>
|
</anycast>
|
||||||
</address>
|
</address>
|
||||||
</addresses>
|
</addresses>
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -122,14 +122,16 @@ Using queue names:
|
||||||
# Peers
|
# Peers
|
||||||
A peer broker connection element is a combination of sender and receivers. The ActiveMQ Artemis broker creates both a sender and a receiver for a peer element, and the endpoint knows how to deal with the pair without creating an infinite loop of sending and receiving messages.
|
A peer broker connection element is a combination of sender and receivers. The ActiveMQ Artemis broker creates both a sender and a receiver for a peer element, and the endpoint knows how to deal with the pair without creating an infinite loop of sending and receiving messages.
|
||||||
|
|
||||||
Currently, [Apache Qpid Dispatch Router](https://qpid.apache.org/components/dispatch-router/index.html) is a peer. ActiveMQ Artemis creates the pair of receivers and sender for each matching destination. These senders and receivers have special configuration to let Qpid Dispatch Router know to collaborate with ActiveMQ Artemis.
|
Currently, [Apache Qpid Dispatch Router](https://qpid.apache.org/components/dispatch-router/index.html) can act as a peer. ActiveMQ Artemis creates the pair of receivers and sender for each matching destination. These senders and receivers have special configuration to let Qpid Dispatch Router know to collaborate with ActiveMQ Artemis.
|
||||||
|
|
||||||
You can experiment with advanced networking scenarios with Qpid Dispatch Router and get a lot of benefit from the AMQP protocol and its ecosystem.
|
You can experiment with advanced networking scenarios with Qpid Dispatch Router and get a lot of benefit from the AMQP protocol and its ecosystem.
|
||||||
|
|
||||||
|
When you add this option, the broker will act as a way point on Qpid Dispatch, and the broker will act as a store point for the messages. For more information refer to the documentation on [Apache Qpid Dispatch Router](https://qpid.apache.org/components/dispatch-router/index.html).
|
||||||
|
|
||||||
With a peer, you have the same properties that you have on a sender and receiver. For example:
|
With a peer, you have the same properties that you have on a sender and receiver. For example:
|
||||||
```xml
|
```xml
|
||||||
<broker-connections>
|
<broker-connections>
|
||||||
<amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
|
<amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-router">
|
||||||
<peer match="queues.#"/>
|
<peer match="queues.#"/>
|
||||||
</amqp-connection>
|
</amqp-connection>
|
||||||
</broker-connections>
|
</broker-connections>
|
||||||
|
@ -137,12 +139,12 @@ With a peer, you have the same properties that you have on a sender and receiver
|
||||||
<addresses>
|
<addresses>
|
||||||
<address name="queues.A">
|
<address name="queues.A">
|
||||||
<anycast>
|
<anycast>
|
||||||
<queue name="localQueueA"/>
|
<queue name="queues.A"/>
|
||||||
</anycast>
|
</anycast>
|
||||||
</address>
|
</address>
|
||||||
<address name="queues.B">
|
<address name="queues.B">
|
||||||
<anycast>
|
<anycast>
|
||||||
<queue name="localQueueB"/>
|
<queue name="queues.B"/>
|
||||||
</anycast>
|
</anycast>
|
||||||
</address>
|
</address>
|
||||||
</addresses>
|
</addresses>
|
||||||
|
@ -180,8 +182,15 @@ An example of a mirror configuration is shown below:
|
||||||
</broker-connections>
|
</broker-connections>
|
||||||
```
|
```
|
||||||
|
|
||||||
## Catch up on Mirror
|
*Important*: One broker can have multiple replicas (1 to many). However a replica site can only have a single source. Make sure you do not connect multiple replicas to a single mirror.
|
||||||
The broker will not send past events over the mirror. As the broker sends and receives messages, only a natural catch up would eventually happen.
|
|
||||||
|
## Pre existing messages
|
||||||
|
The broker will not send pre existing messages through the mirror. So, If you add mirror to your configuration and the journal had pre existing messages these messages will not be sent.
|
||||||
|
|
||||||
|
## Broker Connection Stop and Disconnect
|
||||||
|
Once you start the broker connection with a mirror the mirror events will always be sent to the temporary queue configured at the `source-mirror-address`.
|
||||||
|
|
||||||
|
It is possible to stop the broker connection with the operation stopBrokerConnection(connectionName) on the ServerControl, but it is only effective to disconnect the brokers, while the mirror events are always captured.
|
||||||
|
|
||||||
## Disaster & Recovery considerations
|
## Disaster & Recovery considerations
|
||||||
As you use the mirror option to replicate data across datacenters, you have to take a few considerations:
|
As you use the mirror option to replicate data across datacenters, you have to take a few considerations:
|
||||||
|
@ -195,8 +204,7 @@ As you use the mirror option to replicate data across datacenters, you have to t
|
||||||
* You can have a disabled broker connection to be enabled after the disaster.
|
* You can have a disabled broker connection to be enabled after the disaster.
|
||||||
|
|
||||||
|
|
||||||
## Mirror example sending acknowledgements
|
## Mirror example with Failback
|
||||||
|
|
||||||
On this example lets play with two brokers:
|
On this example lets play with two brokers:
|
||||||
- sourceBroker
|
- sourceBroker
|
||||||
- replicaBroker
|
- replicaBroker
|
||||||
|
@ -211,7 +219,7 @@ Add this configuration on sourceBroker:
|
||||||
</broker-connections>
|
</broker-connections>
|
||||||
```
|
```
|
||||||
|
|
||||||
On the replicaBroker, add disabled broker connection for failing back after a disaster, and also set the acceptors with autoStart=false
|
On the replicaBroker, add a disabled broker connection for failing back after a disaster, and also set the acceptors with autoStart=false
|
||||||
|
|
||||||
```xml
|
```xml
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,7 @@ under the License.
|
||||||
|
|
||||||
<artifactId>amqp-receiving-messages</artifactId>
|
<artifactId>amqp-receiving-messages</artifactId>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
<name>ActiveMQ Artemis JMS Client Side Load Balancing Example</name>
|
<name>amqp-receiving-messages</name>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
|
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
|
||||||
|
@ -103,7 +103,7 @@ under the License.
|
||||||
<spawn>true</spawn>
|
<spawn>true</spawn>
|
||||||
<ignore>${noServer}</ignore>
|
<ignore>${noServer}</ignore>
|
||||||
<location>${basedir}/target/server0</location>
|
<location>${basedir}/target/server0</location>
|
||||||
<testURI>tcp://localhost:5671</testURI>
|
<testURI>tcp://localhost:5660</testURI>
|
||||||
<args>
|
<args>
|
||||||
<param>run</param>
|
<param>run</param>
|
||||||
</args>
|
</args>
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
# AMQP Broker Connection with Senders
|
# AMQP Broker Connection with Receivers
|
||||||
|
|
||||||
To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to create and start the broker manually.
|
To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to create and start the broker manually.
|
||||||
|
|
||||||
|
|
|
@ -27,11 +27,8 @@ import javax.jms.TextMessage;
|
||||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This example demonstrates how sessions created from a single connection can be load
|
* This example is demonstrating how messages are transferred from one broker towards another broker
|
||||||
* balanced across the different nodes of the cluster.
|
* through the receiver operation on a AMQP Broker Connection.
|
||||||
* <p>
|
|
||||||
* In this example there are three nodes and we use a round-robin client side load-balancing
|
|
||||||
* policy.
|
|
||||||
*/
|
*/
|
||||||
public class BrokerConnectionReceiver {
|
public class BrokerConnectionReceiver {
|
||||||
|
|
||||||
|
@ -60,7 +57,7 @@ public class BrokerConnectionReceiver {
|
||||||
// Step 2. create a connection on server1, and receive a few messages.
|
// Step 2. create a connection on server1, and receive a few messages.
|
||||||
// the sender on the broker conneciton will take care of the transfer.
|
// the sender on the broker conneciton will take care of the transfer.
|
||||||
Connection connectionOnServer1 = null;
|
Connection connectionOnServer1 = null;
|
||||||
ConnectionFactory connectionFactoryServer1 = new JmsConnectionFactory("amqp://localhost:5671");
|
ConnectionFactory connectionFactoryServer1 = new JmsConnectionFactory("amqp://localhost:5660");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
connectionOnServer1 = connectionFactoryServer1.createConnection();
|
connectionOnServer1 = connectionFactoryServer1.createConnection();
|
||||||
|
|
|
@ -47,7 +47,7 @@ under the License.
|
||||||
|
|
||||||
<acceptors>
|
<acceptors>
|
||||||
<!-- Acceptor for every supported protocol -->
|
<!-- Acceptor for every supported protocol -->
|
||||||
<acceptor name="artemis">tcp://0.0.0.0:5671?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
|
<acceptor name="artemis">tcp://0.0.0.0:5660?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
|
||||||
|
|
||||||
</acceptors>
|
</acceptors>
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,7 @@ under the License.
|
||||||
|
|
||||||
<artifactId>amqp-sending-messages</artifactId>
|
<artifactId>amqp-sending-messages</artifactId>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
<name>ActiveMQ Artemis JMS Client Side Load Balancing Example</name>
|
<name>amqp-sending-messages</name>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
|
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
|
||||||
|
@ -87,7 +87,7 @@ under the License.
|
||||||
<ignore>${noServer}</ignore>
|
<ignore>${noServer}</ignore>
|
||||||
<spawn>true</spawn>
|
<spawn>true</spawn>
|
||||||
<location>${basedir}/target/server1</location>
|
<location>${basedir}/target/server1</location>
|
||||||
<testURI>tcp://localhost:5672</testURI>
|
<testURI>tcp://localhost:5771</testURI>
|
||||||
<args>
|
<args>
|
||||||
<param>run</param>
|
<param>run</param>
|
||||||
</args>
|
</args>
|
||||||
|
@ -103,7 +103,7 @@ under the License.
|
||||||
<spawn>true</spawn>
|
<spawn>true</spawn>
|
||||||
<ignore>${noServer}</ignore>
|
<ignore>${noServer}</ignore>
|
||||||
<location>${basedir}/target/server0</location>
|
<location>${basedir}/target/server0</location>
|
||||||
<testURI>tcp://localhost:5671</testURI>
|
<testURI>tcp://localhost:5660</testURI>
|
||||||
<args>
|
<args>
|
||||||
<param>run</param>
|
<param>run</param>
|
||||||
</args>
|
</args>
|
||||||
|
|
|
@ -28,13 +28,13 @@ import org.apache.qpid.jms.JmsConnectionFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This example is demonstrating how messages are transferred from one broker towards another broker
|
* This example is demonstrating how messages are transferred from one broker towards another broker
|
||||||
* through the sender element on a AMQP Broker Connection.
|
* through the sender operation on a AMQP Broker Connection.
|
||||||
*/
|
*/
|
||||||
public class BrokerConnectionSender {
|
public class BrokerConnectionSender {
|
||||||
|
|
||||||
public static void main(final String[] args) throws Exception {
|
public static void main(final String[] args) throws Exception {
|
||||||
Connection connectionOnServer0 = null;
|
Connection connectionOnServer0 = null;
|
||||||
ConnectionFactory connectionFactoryServer0 = new JmsConnectionFactory("amqp://localhost:5671");
|
ConnectionFactory connectionFactoryServer0 = new JmsConnectionFactory("amqp://localhost:5660");
|
||||||
|
|
||||||
// Step 1. Create a connection on server0, and send a few messages
|
// Step 1. Create a connection on server0, and send a few messages
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -47,7 +47,7 @@ under the License.
|
||||||
|
|
||||||
<acceptors>
|
<acceptors>
|
||||||
<!-- Acceptor for every supported protocol -->
|
<!-- Acceptor for every supported protocol -->
|
||||||
<acceptor name="artemis">tcp://0.0.0.0:5671?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
|
<acceptor name="artemis">tcp://0.0.0.0:5660?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
|
||||||
|
|
||||||
</acceptors>
|
</acceptors>
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,7 @@ under the License.
|
||||||
|
|
||||||
<artifactId>amqp-ssl-enabled</artifactId>
|
<artifactId>amqp-ssl-enabled</artifactId>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
<name>Connecting to another broker over SSL</name>
|
<name>amqp-ssl-enabled</name>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
|
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
|
||||||
|
@ -102,7 +102,7 @@ under the License.
|
||||||
<spawn>true</spawn>
|
<spawn>true</spawn>
|
||||||
<ignore>${noServer}</ignore>
|
<ignore>${noServer}</ignore>
|
||||||
<location>${basedir}/target/server0</location>
|
<location>${basedir}/target/server0</location>
|
||||||
<testURI>tcp://localhost:5671</testURI>
|
<testURI>tcp://localhost:5660</testURI>
|
||||||
<args>
|
<args>
|
||||||
<param>run</param>
|
<param>run</param>
|
||||||
</args>
|
</args>
|
||||||
|
|
|
@ -1,15 +1,9 @@
|
||||||
# JMS SSL Example
|
# AMQP Broker Connection with Senders and SSL
|
||||||
|
|
||||||
To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to start and create the broker manually.
|
To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to create and start the broker manually.
|
||||||
|
|
||||||
This example shows you how to configure SSL with ActiveMQ Artemis to send and receive message.
|
This example demonstrates how you can create a broker connection from one broker towards another broker, and send messages from that broker towards the target server.
|
||||||
|
|
||||||
Using SSL can make your messaging applications interact with ActiveMQ Artemis securely. An application can be secured transparently without extra coding effort. To secure your messaging application with SSL, you need to configure connector and acceptor as follows:
|
You basically configured the broker connection on broker.xml and this example will give you two working servers where you send messages in one broker and receive it on another broker.
|
||||||
|
|
||||||
<acceptor name="netty-ssl-acceptor">tcp://localhost:5500?sslEnabled=true;keyStorePath=activemq.example.keystore;keyStorePassword=activemqexample</acceptor>
|
The Broker connection on this example is configured to use SSL. The client connections here are using regular connections.
|
||||||
|
|
||||||
In the configuration, the `activemq.example.keystore` is the key store file holding the server's certificate. The `activemq.example.truststore` is the file holding the certificates which the client trusts (i.e. the server's certificate exported from activemq.example.keystore). They are generated via the following commands:
|
|
||||||
|
|
||||||
* `keytool -genkey -keystore activemq.example.keystore -storepass activemqexample -keypass activemqexample -dname "CN=ActiveMQ Artemis Server, OU=Artemis, O=ActiveMQ, L=AMQ, S=AMQ, C=AMQ" -keyalg RSA`
|
|
||||||
* `keytool -export -keystore activemq.example.keystore -file server-side-cert.cer -storepass activemqexample`
|
|
||||||
* `keytool -import -keystore activemq.example.truststore -file server-side-cert.cer -storepass activemqexample -keypass activemqexample -noprompt`
|
|
||||||
|
|
|
@ -29,12 +29,13 @@ import org.apache.qpid.jms.JmsConnectionFactory;
|
||||||
/**
|
/**
|
||||||
* This example is demonstrating how messages are transferred from one broker towards another broker
|
* This example is demonstrating how messages are transferred from one broker towards another broker
|
||||||
* through the sender element on a AMQP Broker Connection.
|
* through the sender element on a AMQP Broker Connection.
|
||||||
|
* The sender operation on the broker connection will use a SSL connection.
|
||||||
*/
|
*/
|
||||||
public class BrokerConnectionSenderSSL {
|
public class BrokerConnectionSenderSSL {
|
||||||
|
|
||||||
public static void main(final String[] args) throws Exception {
|
public static void main(final String[] args) throws Exception {
|
||||||
Connection connectionOnServer0 = null;
|
Connection connectionOnServer0 = null;
|
||||||
ConnectionFactory connectionFactoryServer0 = new JmsConnectionFactory("amqp://localhost:5671");
|
ConnectionFactory connectionFactoryServer0 = new JmsConnectionFactory("amqp://localhost:5660");
|
||||||
|
|
||||||
// Step 1. Create a connection on server0, and send a few messages
|
// Step 1. Create a connection on server0, and send a few messages
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -31,7 +31,7 @@ under the License.
|
||||||
<!-- Acceptors -->
|
<!-- Acceptors -->
|
||||||
<acceptors>
|
<acceptors>
|
||||||
<!-- keystores will be found automatically if they are on the classpath -->
|
<!-- keystores will be found automatically if they are on the classpath -->
|
||||||
<acceptor name="artemis">tcp://0.0.0.0:5671?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
|
<acceptor name="artemis">tcp://0.0.0.0:5660?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
|
||||||
</acceptors>
|
</acceptors>
|
||||||
|
|
||||||
<broker-connections>
|
<broker-connections>
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
|
||||||
import org.apache.activemq.artemis.tests.util.CFUtil;
|
import org.apache.activemq.artemis.tests.util.CFUtil;
|
||||||
import org.apache.activemq.artemis.tests.util.Wait;
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
import org.apache.activemq.artemis.utils.ExecuteUtil;
|
import org.apache.activemq.artemis.utils.ExecuteUtil;
|
||||||
|
import org.jboss.logging.Logger;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
|
@ -46,6 +47,7 @@ import org.junit.Test;
|
||||||
|
|
||||||
/** This test will only be executed if you have qdrouterd available on your system, otherwise is ignored by an assume exception. */
|
/** This test will only be executed if you have qdrouterd available on your system, otherwise is ignored by an assume exception. */
|
||||||
public class QpidDispatchPeerTest extends AmqpClientTestSupport {
|
public class QpidDispatchPeerTest extends AmqpClientTestSupport {
|
||||||
|
private static final Logger logger = Logger.getLogger(QpidDispatchPeerTest.class);
|
||||||
|
|
||||||
ExecuteUtil.ProcessHolder qpidProcess;
|
ExecuteUtil.ProcessHolder qpidProcess;
|
||||||
|
|
||||||
|
@ -58,8 +60,8 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
|
||||||
int result = ExecuteUtil.runCommand(true, "qdrouterd", "--version");
|
int result = ExecuteUtil.runCommand(true, "qdrouterd", "--version");
|
||||||
Assume.assumeTrue("qdrouterd does not exist", result == 0);
|
Assume.assumeTrue("qdrouterd does not exist", result == 0);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
logger.debug(e.getMessage(), e);
|
||||||
Assume.assumeNoException(e);
|
Assume.assumeNoException("qdrouterd does not exist", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue