This commit is contained in:
Clebert Suconic 2020-10-29 15:02:39 -04:00
commit 52fa82738e
12 changed files with 70 additions and 80 deletions

View File

@ -289,13 +289,13 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
if (bridgeManager.isStarted() && started) {
if (brokerConnectConfiguration.getReconnectAttempts() < 0 || retryCounter < brokerConnectConfiguration.getReconnectAttempts()) {
retryCounter++;
ActiveMQAMQPProtocolLogger.LOGGER.retryConnection(brokerConnectConfiguration.getName(), host, port, retryCounter, brokerConnectConfiguration.getReconnectAttempts());
ActiveMQAMQPProtocolLogger.LOGGER.retryConnection(brokerConnectConfiguration.getName(), host + ":" + port, retryCounter, brokerConnectConfiguration.getReconnectAttempts());
if (logger.isDebugEnabled()) {
logger.debug("Reconnecting in " + brokerConnectConfiguration.getRetryInterval() + ", this is the " + retryCounter + " of " + brokerConnectConfiguration.getReconnectAttempts());
}
reconnectFuture = scheduledExecutorService.schedule(() -> connectExecutor.execute(() -> doConnect()), brokerConnectConfiguration.getRetryInterval(), TimeUnit.MILLISECONDS);
} else {
ActiveMQAMQPProtocolLogger.LOGGER.retryConnectionFailed(brokerConnectConfiguration.getName(), host, port, retryCounter, brokerConnectConfiguration.getReconnectAttempts());
ActiveMQAMQPProtocolLogger.LOGGER.retryConnectionFailed(brokerConnectConfiguration.getName(), host + ":" + port, retryCounter);
if (logger.isDebugEnabled()) {
logger.debug("no more reconnections as the retry counter reached " + retryCounter + " out of " + brokerConnectConfiguration.getReconnectAttempts());
}

View File

@ -51,13 +51,13 @@ public interface ActiveMQAMQPProtocolLogger extends BasicLogger {
@LogMessage(level = Logger.Level.WARN)
@Message(id = 111001, value = "\n*******************************************************************************************************************************" +
"\nCould not re-establish AMQP Server Connection {0} on {1}:{2} after {3} retries with a total configured of {4}" +
"\nCould not re-establish AMQP Server Connection {0} on {1} after {2} retries" +
"\n*******************************************************************************************************************************\n", format = Message.Format.MESSAGE_FORMAT)
void retryConnectionFailed(String name, String host, int port, int currentRetry, int maxRetry);
void retryConnectionFailed(String name, String hostAndPort, int currentRetry);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 111002, value = "\n*******************************************************************************************************************************" +
"\nRetrying Server AMQP Connection {0} on {1}:{2} retry {3} of {4}" +
"\nRetrying Server AMQP Connection {0} on {1} retry {2} of {3}" +
"\n*******************************************************************************************************************************\n", format = Message.Format.MESSAGE_FORMAT)
void retryConnection(String name, String host, int port, int currentRetry, int maxRetry);
void retryConnection(String name, String hostAndPort, int currentRetry, int maxRetry);
}

View File

@ -1911,7 +1911,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
connectionElement = amqpMirrorConnectionElement;
connectionElement.setType(AMQPBrokerConnectionAddressType.MIRROR);
} else {
String match = getAttributeValue(e2, "match");
String match = getAttributeValue(e2, "address-match");
String queue = getAttributeValue(e2, "queue-name");
connectionElement = new AMQPBrokerConnectionElement();
connectionElement.setMatchAddress(SimpleString.toSimpleString(match)).setType(nodeType);

View File

@ -972,7 +972,7 @@
<xsd:attribute name="match" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
pattern for matching security roles against addresses; can use wildards
pattern for matching security roles against addresses; can use wildcards
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
@ -2075,7 +2075,7 @@
</xsd:complexType>
<xsd:complexType name="amqp-address-match-type">
<xsd:attribute name="match" type="xsd:string" use="optional">
<xsd:attribute name="address-match" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
address expression to match addresses
@ -2083,12 +2083,12 @@
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="queue-name" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
This is the exact queue name to be used.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:annotation>
<xsd:documentation>
This is the exact queue name to be used.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<xsd:complexType name="amqp-mirror-type">
<xsd:annotation>
@ -2097,15 +2097,6 @@
All events will be send towards this AMQP connection acting like a replica.
</xsd:documentation>
</xsd:annotation>
<!--
TODO: comment this out when we start supporting matching on mirror.
<xsd:attribute name="match" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
address expression to match addresses
</xsd:documentation>
</xsd:annotation>
</xsd:attribute> -->
<xsd:attribute name="message-acknowledgements" type="xsd:boolean" use="optional" default="true">
<xsd:annotation>
@ -3871,7 +3862,7 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all>
<xsd:attribute name="match" type="xsd:string" use="required">

View File

@ -374,9 +374,9 @@
</cluster-connections>
<broker-connections>
<amqp-connection uri="tcp://test1:111" name="test1" retry-interval="333" reconnect-attempts="33" user="testuser" password="testpassword">
<sender match="TEST-SENDER" />
<receiver match="TEST-RECEIVER" />
<peer match="TEST-PEER"/>
<sender address-match="TEST-SENDER" />
<receiver address-match="TEST-RECEIVER" />
<peer address-match="TEST-PEER"/>
<receiver queue-name="TEST-WITH-QUEUE-NAME"/>
<mirror message-acknowledgements="false" queue-creation="false" source-mirror-address="TEST-REPLICA" queue-removal="false"/>
</amqp-connection>

View File

@ -614,7 +614,7 @@
<xsd:annotation>
<xsd:documentation>
A list of connections the broker will make towards other servers.
Currently the only connection type supported is amqp-connection
Currently the only connection type supported is amqpConnection
</xsd:documentation>
</xsd:annotation>
</xsd:element>
@ -972,7 +972,7 @@
<xsd:attribute name="match" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
pattern for matching security roles against addresses; can use wildards
pattern for matching security roles against addresses; can use wildcards
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
@ -2028,6 +2028,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="auto-start" type="xsd:boolean" default="true">
<xsd:annotation>
<xsd:documentation>
should the broker connection be started when the server is started.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="reconnect-attempts" type="xsd:int" default="-1">
<xsd:annotation>
<xsd:documentation>
@ -2068,7 +2075,7 @@
</xsd:complexType>
<xsd:complexType name="amqp-address-match-type">
<xsd:attribute name="match" type="xsd:string" use="optional">
<xsd:attribute name="address-match" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
address expression to match addresses
@ -2090,15 +2097,6 @@
All events will be send towards this AMQP connection acting like a replica.
</xsd:documentation>
</xsd:annotation>
<!--
TODO: comment this out when we start supporting matching on mirror.
<xsd:attribute name="match" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
address expression to match addresses
</xsd:documentation>
</xsd:annotation>
</xsd:attribute> -->
<xsd:attribute name="message-acknowledgements" type="xsd:boolean" use="optional" default="true">
<xsd:annotation>
@ -3865,20 +3863,12 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="page-store-name" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the name of the page store to use, to allow the page store to coalesce for address hierarchies when wildcard routing is in play
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all>
<xsd:attribute name="match" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
pattern for matching settings against addresses; can use wildcards
pattern for matching settings against addresses; can use wildards
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>

View File

@ -34,7 +34,7 @@ To define an AMQP broker connection, add an `<amqp-connection>` element within t
- `reconnect-attempts`: default is -1 meaning infinite
- `auto-start` : Should the broker connection start automatically with the broker. Default is `true`. If false you need to call a management operation to start it.
*Notice*: If you disable auto-start on the broker connection, the start of the broker connection will only happen after the management method `startBrokerConnection(connectionName)` is called on the ServerController.
*Notice:* If you disable auto-start on the broker connection, the start of the broker connection will only happen after the management method `startBrokerConnection(connectionName)` is called on the ServerController.
*Important*: The target endpoint needs permission for all operations that you configure. Therefore, If you are using a security manager, ensure that you perform the configured operations as a user with sufficient permissions.
@ -61,7 +61,7 @@ Both elements work like a message bridge. However, there is no additional overhe
You can configure senders or receivers for specific queues. You can also match senders and receivers to specific addresses or _sets_ of addresses, using wildcard expressions. When configuring a sender or receiver, you can set the following properties:
- `match`: Match the sender or receiver to a specific address or __set__ of addresses, using a wildcard expression
- `address-match`: Match the sender or receiver to a specific address or __set__ of addresses, using a wildcard expression
- `queue-name`: Configure the sender or receiver for a specific queue
@ -71,9 +71,9 @@ Using address expressions:
```xml
<broker-connections>
<amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
<sender match="queues.#"/>
<sender address-match="queues.#"/>
<!-- notice the local queues for remotequeues.# need to be created on this broker -->
<receiver match="remotequeues.#"/>
<receiver address-match="remotequeues.#"/>
</amqp-connection>
</broker-connections>
@ -90,7 +90,6 @@ Using address expressions:
</address>
</addresses>
```
Using queue names:
```xml
<broker-connections>
@ -112,14 +111,13 @@ Using queue names:
</anycast>
</address>
</addresses>
```
*Important*: You can match a receiver only to a local queue that already exists. Therefore, if you are using receivers, make sure that you pre-create the queue locally. Otherwise, the broker cannot match the remote queues and addresses.
*Important:* You can match a receiver only to a local queue that already exists. Therefore, if you are using receivers, make sure that you pre-create the queue locally. Otherwise, the broker cannot match the remote queues and addresses.
*Important*: Do not create a sender and a receiver to the same destination. This creates an infinite loop of sends and receives.
*Important:* Do not create a sender and a receiver to the same destination. This creates an infinite loop of sends and receives.
# Peers
## Peers
The broker can be configured as a peer which connects to the [Apache Qpid Dispatch Router](https://qpid.apache.org/components/dispatch-router/) and instructs it the broker it will act as a store-and-forward queue for a given AMQP waypoint address configured on the router. In this scenario, clients connect to a router to send and receive messages using a waypointed address, and the router routes these messages to or from the queue on the broker.
The peer configuration causes ActiveMQ Artemis to create a sender and receiver pair for each destination matched in the broker-connection configuration, with these carrying special configuration to let Qpid Dispatch know to collaborate with the broker. This replaces the traditional need of a router-initiated connection and auto-links.
@ -130,7 +128,7 @@ With a peer configuration, you have the same properties that you have on a sende
```xml
<broker-connections>
<amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-router">
<peer match="queues.#"/>
<peer address-match="queues.#"/>
</amqp-connection>
</broker-connections>
@ -163,7 +161,29 @@ For more information refer to the "brokered messaging" documentation for [Apache
*Important:* You do not need to configure the router with a connector or auto-links to communicate with the broker. The brokers peer configuration replaces these aspects of the router waypoint usage.
# Mirror
## Address Consideration
It is highly recommended that you keep `address name` and `queue name` the same, as when you use a queue with its distinct name (as in the following example), senders and receivers will always use the `address name` when creating the remote endpoint.
```xml
<broker-connections>
<amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
<sender address-match="queues.#"/>
</amqp-connection>
</broker-connections>
<addresses>
<address name="queues.A">
<anycast>
<queue name="distinctNameQueue.A"/>
</anycast>
</address>
</addresses>
```
In the above example the `broker connection` would create an AMQP sender towards "queues.A".
*Important:* To avoid confusion it is recommended that you keep the `address name` and `queue name` the same.
## Mirror
The mirror option on the broker connection can capture events from the broker and pass them over the wire to another broker. This enables you to capture multiple asynchronous replicas. The following types of events are captured:
* Message routing
@ -195,7 +215,7 @@ An example of a mirror configuration is shown below:
*Important*: A broker can mirror to multiple replicas (1 to many). However a replica broker can only have a single mirror source. Make sure you do not mirror multiple source brokers to a single replica broker.
## Pre existing messages
### Pre Existing Messages
The broker will not send pre existing messages through the mirror. So, If you add mirror to your configuration and the journal had pre existing messages these messages will not be sent.
## Broker Connection Stop and Disconnect
@ -203,7 +223,7 @@ Once you start the broker connection with a mirror the mirror events will always
It is possible to stop the broker connection with the operation stopBrokerConnection(connectionName) on the ServerControl, but it is only effective to disconnect the brokers, while the mirror events are always captured.
## Disaster & Recovery considerations
## Disaster & Recovery Considerations
As you use the mirror option to replicate data across datacenters, you have to take a few considerations:
* Currently we don't support quorums for activating the replica, so you have to manually control when your clients connect to the replica site.
@ -241,7 +261,7 @@ On the replicaBroker, add a disabled broker connection for failing back after a
<acceptor name="amqp">tcp://0.0.0.0:6700?autoStart=true;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true;autoStart=false</acceptor>
</acceptors>
<broker-connections>
<amqp-connection uri="tcp://sourceBroker:6700" name="sourceBroker" auto-start="false">
<amqp-connection uri="tcp://sourceBroker:6700" name="failbackBroker" auto-start="false">
<mirror message-acknowledgements="true"/>
</amqp-connection>
</broker-connections>
@ -249,9 +269,9 @@ On the replicaBroker, add a disabled broker connection for failing back after a
After a failure has occurred, you can use a management operation start on the acceptor:
- AccetorControl.start();
- AcceptorControl.start();
And you can call startBrokerConnection to enable the failback towards the live site:
- ActiveMQServerControl.startBrokerConnection("sourceBroker")
- ActiveMQServerControl.startBrokerConnection("failbackBroker")

View File

@ -54,7 +54,7 @@ under the License.
<broker-connections>
<amqp-connection uri="tcp://localhost:5672" name="receiver" retry-interval="100">
<!-- This will create one receiver for every queue matching this address expression -->
<receiver match="#"/>
<receiver address-match="#"/>
</amqp-connection>
</broker-connections>

View File

@ -54,7 +54,7 @@ under the License.
<broker-connections>
<amqp-connection uri="tcp://localhost:5771" name="sender" retry-interval="100">
<!-- This will create one sender for every queue matching this address expression -->
<sender match="#"/>
<sender address-match="#"/>
</amqp-connection>
</broker-connections>

View File

@ -37,7 +37,7 @@ under the License.
<broker-connections>
<amqp-connection uri="tcp://localhost:5772?sslEnabled=true;trustStorePath=activemq.example.truststore;trustStorePassword=activemqexample" name="otherSSL" retry-interval="1000">
<sender match="#"/>
<sender address-match="#"/>
</amqp-connection>
</broker-connections>

View File

@ -99,7 +99,7 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
private void internalMultipleQueues(boolean useMatching, boolean distinctNaming) throws Exception {
final int numberOfMessages = 100;
final int numberOfQueues = 10;
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:24621").setRetryInterval(10).setReconnectAttempts(-1);
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:24622").setRetryInterval(10).setReconnectAttempts(-1);
if (useMatching) {
amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress("queue.#").setType(AMQPBrokerConnectionAddressType.PEER));
} else {

View File

@ -25,21 +25,10 @@
#outputFile: /tmp/qdrouterd.log
#}
# The broker connects into this port
listener {
saslMechanisms: ANONYMOUS
host: 0.0.0.0
role: route-container
linkCapacity: 1123
authenticatePeer: no
port: 24621
}
# Clients connect to this port
listener {
saslMechanisms: ANONYMOUS
host: 0.0.0.0
linkCapacity: 555
role: normal
authenticatePeer: no
port: 24622