ARTEMIS-4251 Support CORE client failover to other live servers

Improve the CORE client failover connecting to other live servers when all
reconnect attempts fails, i.e. in a cluster composed of 2 live servers,
when the server to which the CORE client is connected goes down the CORE
client should reconnect its sessions to the other liver broker.
This commit is contained in:
Domenico Francesco Bruscino 2023-04-22 16:47:17 +02:00 committed by clebertsuconic
parent 2f5463c960
commit bd3c057559
16 changed files with 540 additions and 225 deletions

View File

@ -44,6 +44,7 @@ public class ServerLocatorConfig {
public long maxRetryInterval = ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL; public long maxRetryInterval = ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL;
public int reconnectAttempts = ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS; public int reconnectAttempts = ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS;
public int initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS; public int initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS;
public int failoverAttempts = ActiveMQClient.DEFAULT_FAILOVER_ATTEMPTS;
public int initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE; public int initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
public boolean cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT; public boolean cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
public boolean compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES; public boolean compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
@ -80,6 +81,7 @@ public class ServerLocatorConfig {
maxRetryInterval = locator.maxRetryInterval; maxRetryInterval = locator.maxRetryInterval;
reconnectAttempts = locator.reconnectAttempts; reconnectAttempts = locator.reconnectAttempts;
initialConnectAttempts = locator.initialConnectAttempts; initialConnectAttempts = locator.initialConnectAttempts;
failoverAttempts = locator.failoverAttempts;
initialMessagePacketSize = locator.initialMessagePacketSize; initialMessagePacketSize = locator.initialMessagePacketSize;
useTopologyForLoadBalancing = locator.useTopologyForLoadBalancing; useTopologyForLoadBalancing = locator.useTopologyForLoadBalancing;
} }

View File

@ -113,6 +113,8 @@ public final class ActiveMQClient {
public static final int INITIAL_CONNECT_ATTEMPTS = 1; public static final int INITIAL_CONNECT_ATTEMPTS = 1;
public static final int DEFAULT_FAILOVER_ATTEMPTS = 0;
@Deprecated @Deprecated
public static final boolean DEFAULT_FAILOVER_ON_INITIAL_CONNECTION = false; public static final boolean DEFAULT_FAILOVER_ON_INITIAL_CONNECTION = false;

View File

@ -653,6 +653,21 @@ public interface ServerLocator extends AutoCloseable {
*/ */
int getInitialConnectAttempts(); int getInitialConnectAttempts();
/**
* Sets the maximum number of failover attempts to establish a connection to other live servers after a connection failure.
* <p>
* Value must be -1 (to retry infinitely), 0 (to never retry connection) or greater than 0.
*
* @param attempts maximum number of failover attempts after a connection failure
* @return this ServerLocator
*/
ServerLocator setFailoverAttempts(int attempts);
/**
* @return the number of failover attempts after a connection failure.
*/
int getFailoverAttempts();
/** /**
* Returns true if the client will automatically attempt to connect to the backup server if the initial * Returns true if the client will automatically attempt to connect to the backup server if the initial
* connection to the live server fails * connection to the live server fails

View File

@ -75,6 +75,7 @@ import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.function.BiPredicate;
public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener { public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {
@ -132,6 +133,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
private int reconnectAttempts; private int reconnectAttempts;
private int failoverAttempts;
private final Set<SessionFailureListener> listeners = new ConcurrentHashSet<>(); private final Set<SessionFailureListener> listeners = new ConcurrentHashSet<>();
private final Set<FailoverEventListener> failoverListeners = new ConcurrentHashSet<>(); private final Set<FailoverEventListener> failoverListeners = new ConcurrentHashSet<>();
@ -239,6 +242,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
this.reconnectAttempts = reconnectAttempts; this.reconnectAttempts = reconnectAttempts;
this.failoverAttempts = locatorConfig.failoverAttempts;
this.scheduledThreadPool = scheduledThreadPool; this.scheduledThreadPool = scheduledThreadPool;
this.threadPool = threadPool; this.threadPool = threadPool;
@ -640,7 +645,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
// failoverLock // failoverLock
// until failover is complete // until failover is complete
if (reconnectAttempts != 0) { if (reconnectAttempts != 0 || failoverAttempts != 0) {
if (clientProtocolManager.cleanupBeforeFailover(me)) { if (clientProtocolManager.cleanupBeforeFailover(me)) {
@ -673,33 +678,96 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
sessionsToFailover = new HashSet<>(sessions); sessionsToFailover = new HashSet<>(sessions);
} }
// Notify sessions before failover.
for (ClientSessionInternal session : sessionsToFailover) { for (ClientSessionInternal session : sessionsToFailover) {
session.preHandleFailover(connection); session.preHandleFailover(connection);
} }
boolean allSessionReconnected = false;
int failedReconnectSessionsCounter = 0; // Try to reconnect to the current connector pair.
do { // Before ARTEMIS-4251 ClientSessionFactoryImpl only tries to reconnect to the current connector pair.
allSessionReconnected = reconnectSessions(sessionsToFailover, oldConnection, reconnectAttempts, me); int reconnectRetries = 0;
if (oldConnection != null) { boolean sessionsReconnected = false;
oldConnection.destroy(); BiPredicate<Boolean, Integer> reconnectRetryPredicate =
(reconnected, retries) -> clientProtocolManager.isAlive() &&
!reconnected && (reconnectAttempts == -1 || retries < reconnectAttempts);
while (reconnectRetryPredicate.test(sessionsReconnected, reconnectRetries)) {
int remainingReconnectRetries = reconnectAttempts == -1 ? -1 : reconnectAttempts - reconnectRetries;
reconnectRetries += getConnectionWithRetry(remainingReconnectRetries, oldConnection);
if (connection != null) {
sessionsReconnected = reconnectSessions(sessionsToFailover, oldConnection, me);
if (!sessionsReconnected) {
if (oldConnection != null) {
oldConnection.destroy();
}
oldConnection = connection;
connection = null;
}
} }
if (!allSessionReconnected) { reconnectRetries++;
failedReconnectSessionsCounter++; if (reconnectRetryPredicate.test(sessionsReconnected, reconnectRetries)) {
oldConnection = connection; waitForRetry(retryInterval);
connection = null; }
}
// Wait for retry when the connection is established but not all session are reconnected.
if ((reconnectAttempts == -1 || failedReconnectSessionsCounter < reconnectAttempts) && oldConnection != null) { // Try to connect to other connector pairs.
waitForRetry(retryInterval); // After ARTEMIS-4251 ClientSessionFactoryImpl tries to connect to
// other connector pairs when reconnection to the current connector pair fails.
int connectorsCount = 0;
int failoverRetries = 0;
long failoverRetryInterval = retryInterval;
Pair<TransportConfiguration, TransportConfiguration> connectorPair;
BiPredicate<Boolean, Integer> failoverRetryPredicate =
(reconnected, retries) -> clientProtocolManager.isAlive() &&
!reconnected && (failoverAttempts == -1 || retries < failoverAttempts);
while (failoverRetryPredicate.test(sessionsReconnected, failoverRetries)) {
connectorsCount++;
connectorPair = serverLocator.selectNextConnectorPair();
if (connectorPair != null) {
connectorConfig = connectorPair.getA();
currentConnectorConfig = connectorPair.getA();
if (connectorPair.getB() != null) {
backupConnectorConfig = connectorPair.getB();
}
getConnection();
}
if (connection != null) {
sessionsReconnected = reconnectSessions(sessionsToFailover, oldConnection, me);
if (!sessionsReconnected) {
if (oldConnection != null) {
oldConnection.destroy();
}
oldConnection = connection;
connection = null;
}
}
if (connectorsCount >= serverLocator.getConnectorsSize()) {
connectorsCount = 0;
failoverRetries++;
if (failoverRetryPredicate.test(false, failoverRetries)) {
waitForRetry(failoverRetryInterval);
failoverRetryInterval = getNextRetryInterval(failoverRetryInterval);
} }
} }
} }
while ((reconnectAttempts == -1 || failedReconnectSessionsCounter < reconnectAttempts) && !allSessionReconnected);
// Notify sessions after failover.
for (ClientSessionInternal session : sessionsToFailover) { for (ClientSessionInternal session : sessionsToFailover) {
session.postHandleFailover(connection, allSessionReconnected); session.postHandleFailover(connection, sessionsReconnected);
} }
if (oldConnection != null) { if (oldConnection != null) {
@ -830,15 +898,12 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
*/ */
private boolean reconnectSessions(final Set<ClientSessionInternal> sessionsToFailover, private boolean reconnectSessions(final Set<ClientSessionInternal> sessionsToFailover,
final RemotingConnection oldConnection, final RemotingConnection oldConnection,
final int reconnectAttempts,
final ActiveMQException cause) { final ActiveMQException cause) {
getConnectionWithRetry(reconnectAttempts, oldConnection);
if (connection == null) { if (connection == null) {
if (!clientProtocolManager.isAlive()) if (!clientProtocolManager.isAlive())
ActiveMQClientLogger.LOGGER.failedToConnectToServer(); ActiveMQClientLogger.LOGGER.failedToConnectToServer();
return true; return false;
} }
List<FailureListener> oldListeners = oldConnection.getFailureListeners(); List<FailureListener> oldListeners = oldConnection.getFailureListeners();
@ -874,9 +939,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
return !sessionFailoverError; return !sessionFailoverError;
} }
private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) { private int getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) {
if (!clientProtocolManager.isAlive()) if (!clientProtocolManager.isAlive())
return; return 0;
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("getConnectionWithRetry::{} with retryInterval = {} multiplier = {}", logger.trace("getConnectionWithRetry::{} with retryInterval = {} multiplier = {}",
reconnectAttempts, retryInterval, retryIntervalMultiplier, new Exception("trace")); reconnectAttempts, retryInterval, retryIntervalMultiplier, new Exception("trace"));
@ -897,7 +962,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion()); ((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion());
} }
logger.debug("Reconnection successful"); logger.debug("Reconnection successful");
return; return count;
} else { } else {
// Failed to get connection // Failed to get connection
@ -909,7 +974,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts); ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts);
} }
return; return count;
} }
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
@ -917,22 +982,28 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
} }
if (waitForRetry(interval)) if (waitForRetry(interval))
return; return count;
// Exponential back-off interval = getNextRetryInterval(interval);
long newInterval = (long) (interval * retryIntervalMultiplier);
if (newInterval > maxRetryInterval) {
newInterval = maxRetryInterval;
}
interval = newInterval;
} else { } else {
logger.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory"); logger.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory");
return; return count;
} }
} }
} }
return count;
}
private long getNextRetryInterval(long retryInterval) {
// Exponential back-off
long nextRetryInterval = (long) (retryInterval * retryIntervalMultiplier);
if (nextRetryInterval > maxRetryInterval) {
nextRetryInterval = maxRetryInterval;
}
return nextRetryInterval;
} }
@Override @Override

View File

@ -440,6 +440,15 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
clusterTransportConfiguration = locator.clusterTransportConfiguration; clusterTransportConfiguration = locator.clusterTransportConfiguration;
} }
private boolean useInitConnector() {
return !config.useTopologyForLoadBalancing || !receivedTopology || topologyArray == null || topologyArray.length == 0;
}
@Override
public Pair<TransportConfiguration, TransportConfiguration> selectNextConnectorPair() {
return selectConnector(useInitConnector());
}
private synchronized Pair<TransportConfiguration, TransportConfiguration> selectConnector(boolean useInitConnector) { private synchronized Pair<TransportConfiguration, TransportConfiguration> selectConnector(boolean useInitConnector) {
Pair<TransportConfiguration, TransportConfiguration>[] usedTopology; Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;
@ -470,7 +479,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
} }
} }
private int getConnectorsSize() { @Override
public int getConnectorsSize() {
Pair<TransportConfiguration, TransportConfiguration>[] usedTopology; Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;
flushTopology(); flushTopology();
@ -673,7 +683,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
int attempts = 0; int attempts = 0;
boolean topologyArrayTried = !config.useTopologyForLoadBalancing || topologyArray == null || topologyArray.length == 0; boolean topologyArrayTried = !config.useTopologyForLoadBalancing || topologyArray == null || topologyArray.length == 0;
boolean staticTried = false; boolean staticTried = false;
boolean shouldTryStatic = !config.useTopologyForLoadBalancing || !receivedTopology || topologyArray == null || topologyArray.length == 0; boolean shouldTryStatic = useInitConnector();
while (retry && !isClosed()) { while (retry && !isClosed()) {
retry = false; retry = false;
@ -1177,6 +1187,18 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
return config.initialConnectAttempts; return config.initialConnectAttempts;
} }
@Override
public ServerLocatorImpl setFailoverAttempts(int attempts) {
checkWrite();
this.config.failoverAttempts = attempts;
return this;
}
@Override
public int getFailoverAttempts() {
return config.failoverAttempts;
}
@Deprecated @Deprecated
@Override @Override
public boolean isFailoverOnInitialConnection() { public boolean isFailoverOnInitialConnection() {

View File

@ -87,4 +87,8 @@ public interface ServerLocatorInternal extends ServerLocator {
ClientProtocolManager newProtocolManager(); ClientProtocolManager newProtocolManager();
boolean isConnectable(); boolean isConnectable();
int getConnectorsSize();
Pair<TransportConfiguration, TransportConfiguration> selectNextConnectorPair();
} }

View File

@ -59,7 +59,7 @@
* [Broker Plugins](broker-plugins.md) * [Broker Plugins](broker-plugins.md)
* [Resource Limits](resource-limits.md) * [Resource Limits](resource-limits.md)
* [The JMS Bridge](jms-bridge.md) * [The JMS Bridge](jms-bridge.md)
* [Client Reconnection and Session Reattachment](client-reconnection.md) * [Client Failover](client-failover.md)
* [Diverting and Splitting Message Flows](diverts.md) * [Diverting and Splitting Message Flows](diverts.md)
* [Core Bridges](core-bridges.md) * [Core Bridges](core-bridges.md)
* [Transformers](transformers.md) * [Transformers](transformers.md)

View File

@ -0,0 +1,161 @@
# Client Failover
Apache ActiveMQ Artemis clients can be configured to automatically
[reconnect to the same server](#reconnect-to-the-same-server),
[reconnect to the backup server](#reconnect-to-the-backup-server) or
[reconnect to other live servers](#reconnect-to-other-live-servers) in the event
that a failure is detected in the connection between the client and the server.
The clients detect connection failure when they have not received any packets
from the server within the time given by `client-failure-check-period` as explained
in section [Detecting Dead Connections](connection-ttl.md).
## Reconnect to the same server
Set `reconnectAttempts` to any non-zero value to reconnect to the same server,
for further details see
[Reconnection and failover attributes](#client-failover-attributes).
If the disconnection was due to some transient failure such as a temporary
network outage and the target server was not restarted, then the sessions will
still exist on the server, assuming the client hasn't been disconnected for
more than [connection-ttl](connection-ttl.md)
In this scenario, the client sessions will be automatically re-attached to the
server sessions after the reconnection. This is done 100% transparently and the
client can continue exactly as if nothing had happened.
The way this works is as follows:
As Apache ActiveMQ Artemis clients send commands to their servers they store
each sent command in an in-memory buffer. In the case that connection failure
occurs and the client subsequently reattaches to the same server, as part of
the reattachment protocol the server informs the client during reattachment
with the id of the last command it successfully received from that client.
If the client has sent more commands than were received before failover it can
replay any sent commands from its buffer so that the client and server can
reconcile their states.Ac
The size of this buffer is configured with the `confirmationWindowSize`
parameter on the connection URL. When the server has received
`confirmationWindowSize` bytes of commands and processed them it will send back
a command confirmation to the client, and the client can then free up space in
the buffer.
The window is specified in bytes.
Setting this parameter to `-1` disables any buffering and prevents any
re-attachment from occurring, forcing reconnect instead. The default value for
this parameter is `-1`. (Which means by default no auto re-attachment will
occur)
## Reconnect to the backup server
Set `reconnectAttempts` to any non-zero value and `ha` to `true` to reconnect
to the back server, for further details see
[Reconnection and failover attributes](#client-failover-attributes).
The clients can be configured to discover the list of live-backup
server groups in a number of different ways. They can be configured
explicitly or probably the most common way of doing this is to use
*server discovery* for the client to automatically discover the list.
For full details on how to configure server discovery, please see [Clusters](clusters.md).
Alternatively, the clients can explicitly connect to a specific server
and download the current servers and backups see [Clusters](clusters.md).
By default, failover will only occur after at least one connection has
been made to the live server. In other words, by default, failover will
not occur if the client fails to make an initial connection to the live
server - in this case it will simply retry connecting to the live server
according to the reconnect-attempts property and fail after this number
of attempts.
## Reconnect to other live servers
Set `failoverAttempts` to any non-zero value to reconnect to other live servers,
for further details see
[Reconnection and failover attributes](#client-failover-attributes).
If `reconnectAttempts` value is not zero then the client will try to reconnect
to other live servers only after all attempts to
[reconnect to the same server](#reconnect-to-the-same-server) or
[reconnect to the backup server](#reconnect-to-the-backup-server) fail.
## Session reconnection
When clients [reconnect to the same server](#reconnect-to-the-same-server)
after a restart, [reconnect to the backup server](#reconnect-to-the-backup-server)
or [reconnect to other live servers](#reconnect-to-other-live-servers) any sessions
will no longer exist on the server and it won't be possible to 100% transparently
re-attach to them. In this case, any sessions and consumers on the client will be
automatically recreated on the server.
Client reconnection is also used internally by components such as core bridges
to allow them to reconnect to their target servers.
## Failing over on the initial connection
Since the client does not learn about the full topology until after the
first connection is made there is a window where it does not know about
the backup. If a failure happens at this point the client can only try
reconnecting to the original live server. To configure how many attempts
the client will make you can set the URL parameter `initialConnectAttempts`.
The default for this is `0`, that is try only once. Once the number of
attempts has been made an exception will be thrown.
For examples of automatic failover with transacted and non-transacted
JMS sessions, please see [the examples](examples.md) chapter.
## Reconnection and failover attributes
Client reconnection and failover is configured using the following parameters:
- `retryInterval`. This optional parameter determines the period in
milliseconds between subsequent reconnection attempts, if the connection to
the target server has failed. The default value is `2000` milliseconds.
- `retryIntervalMultiplier`. This optional parameter determines a multiplier
to apply to the time since the last retry to compute the time to the next
retry.
This allows you to implement an *exponential backoff* between retry attempts.
Let's take an example:
If we set `retryInterval` to `1000` ms and we set `retryIntervalMultiplier`
to `2.0`, then, if the first reconnect attempt fails, we will wait `1000` ms
then `2000` ms then `4000` ms between subsequent reconnection attempts.
The default value is `1.0` meaning each reconnect attempt is spaced at equal
intervals.
- `maxRetryInterval`. This optional parameter determines the maximum retry
interval that will be used. When setting `retryIntervalMultiplier` it would
otherwise be possible that subsequent retries exponentially increase to
ridiculously large values. By setting this parameter you can set an upper limit
on that value. The default value is `2000` milliseconds.
- `ha`. This optional parameter determines weather the client will try to
reconnect to the backup node when the live node is not reachable.
The default value is `false`.
For more information on HA, please see [High Availability and Failover](ha.md).
- `reconnectAttempts`. This optional parameter determines the total number of
reconnect attempts to make to the current live/backup pair before giving up.
A value of `-1` signifies an unlimited number of attempts.
The default value is `0`.
- `failoverAttempts`. This optional parameter determines the total number of
failover attempts to make after a reconnection failure before giving up and
shutting down. A value of `-1` signifies an unlimited number of attempts.
The default value is `0`.
All of these parameters are set on the URL used to connect to the broker.
If your client does manage to reconnect but the session is no longer available
on the server, for instance if the server has been restarted or it has timed
out, then the client won't be able to re-attach, and any `ExceptionListener` or
`FailureListener` instances registered on the connection or session will be
called.
## ExceptionListeners and SessionFailureListeners
Please note, that when a client reconnects or re-attaches, any registered JMS
`ExceptionListener` or core API `SessionFailureListener` will be called.

View File

@ -1,107 +0,0 @@
# Client Reconnection and Session Reattachment
Apache ActiveMQ Artemis clients can be configured to automatically reconnect or
re-attach to the server in the event that a failure is detected in the
connection between the client and the server.
## 100% Transparent session re-attachment
If the disconnection was due to some transient failure such as a temporary
network outage and the target server was not restarted, then the sessions will
still exist on the server, assuming the client hasn't been disconnected for
more than [connection-ttl](connection-ttl.md)
In this scenario, Apache ActiveMQ Artemis will automatically re-attach the
client sessions to the server sessions when the connection reconnects. This is
done 100% transparently and the client can continue exactly as if nothing had
happened.
The way this works is as follows:
As Apache ActiveMQ Artemis clients send commands to their servers they store
each sent command in an in-memory buffer. In the case that connection failure
occurs and the client subsequently reattaches to the same server, as part of
the reattachment protocol the server informs the client during reattachment
with the id of the last command it successfully received from that client.
If the client has sent more commands than were received before failover it can
replay any sent commands from its buffer so that the client and server can
reconcile their states.Ac
The size of this buffer is configured with the `confirmationWindowSize`
parameter on the connection URL. When the server has received
`confirmationWindowSize` bytes of commands and processed them it will send back
a command confirmation to the client, and the client can then free up space in
the buffer.
The window is specified in bytes.
Setting this parameter to `-1` disables any buffering and prevents any
re-attachment from occurring, forcing reconnect instead. The default value for
this parameter is `-1`. (Which means by default no auto re-attachment will
occur)
## Session reconnection
Alternatively, the server might have actually been restarted after crashing or
being stopped. In this case any sessions will no longer exist on the server and
it won't be possible to 100% transparently re-attach to them.
In this case, the Apache ActiveMQ Artemis client will automatically reconnect
and *recreate* any sessions and consumers on the server corresponding to the
sessions and consumers on the client. This process is exactly the same as what
happens during failover onto a backup server.
Client reconnection is also used internally by components such as core bridges
to allow them to reconnect to their target servers.
Please see the section on failover [Automatic Client Failover](ha.md) to get a
full understanding of how transacted and non-transacted sessions are
reconnected during failover/reconnect and what you need to do to maintain *once
and only once* delivery guarantees.
## Configuring reconnection/reattachment attributes
Client reconnection is configured using the following parameters:
- `retryInterval`. This optional parameter determines the period in
milliseconds between subsequent reconnection attempts, if the connection to
the target server has failed. The default value is `2000` milliseconds.
- `retryIntervalMultiplier`. This optional parameter determines a multiplier
to apply to the time since the last retry to compute the time to the next
retry.
This allows you to implement an *exponential backoff* between retry attempts.
Let's take an example:
If we set `retryInterval` to `1000` ms and we set `retryIntervalMultiplier`
to `2.0`, then, if the first reconnect attempt fails, we will wait `1000` ms
then `2000` ms then `4000` ms between subsequent reconnection attempts.
The default value is `1.0` meaning each reconnect attempt is spaced at equal
intervals.
- `maxRetryInterval`. This optional parameter determines the maximum retry
interval that will be used. When setting `retryIntervalMultiplier` it would
otherwise be possible that subsequent retries exponentially increase to
ridiculously large values. By setting this parameter you can set an upper limit
on that value. The default value is `2000` milliseconds.
- `reconnectAttempts`. This optional parameter determines the total number of
reconnect attempts to make before giving up and shutting down. A value of
`-1` signifies an unlimited number of attempts. The default value is `0`.
All of these parameters are set on the URL used to connect to the broker.
If your client does manage to reconnect but the session is no longer available
on the server, for instance if the server has been restarted or it has timed
out, then the client won't be able to re-attach, and any `ExceptionListener` or
`FailureListener` instances registered on the connection or session will be
called.
## ExceptionListeners and SessionFailureListeners
Please note, that when a client reconnects or re-attaches, any registered JMS
`ExceptionListener` or core API `SessionFailureListener` will be called.

View File

@ -362,7 +362,7 @@ Name | Description | Default
[use-duplicate-detection](clusters.md)| should duplicate detection headers be inserted in forwarded messages? | `true` [use-duplicate-detection](clusters.md)| should duplicate detection headers be inserted in forwarded messages? | `true`
[message-load-balancing](clusters.md) | how should messages be load balanced? | `OFF` [message-load-balancing](clusters.md) | how should messages be load balanced? | `OFF`
[max-hops](clusters.md)| maximum number of hops cluster topology is propagated. | 1 [max-hops](clusters.md)| maximum number of hops cluster topology is propagated. | 1
[confirmation-window-size](client-reconnection.md#client-reconnection-and-session-reattachment)| The size (in bytes) of the window used for confirming data from the server connected to. | 1048576 [confirmation-window-size](client-failover.md#reconnect-to-the-same-server)| The size (in bytes) of the window used for confirming data from the server connected to. | 1048576
[producer-window-size](clusters.md)| Flow Control for the Cluster connection bridge. | -1 (disabled) [producer-window-size](clusters.md)| Flow Control for the Cluster connection bridge. | -1 (disabled)
[call-failover-timeout](clusters.md#configuring-cluster-connections)| How long to wait for a reply if in the middle of a fail-over. -1 means wait forever. | -1 [call-failover-timeout](clusters.md#configuring-cluster-connections)| How long to wait for a reply if in the middle of a fail-over. -1 means wait forever. | -1
[notification-interval](clusters.md) | how often the cluster connection will notify the cluster of its existence right after joining the cluster. | 1000 [notification-interval](clusters.md) | how often the cluster connection will notify the cluster of its existence right after joining the cluster. | 1000

View File

@ -151,8 +151,8 @@ Let's take a look at all the parameters in turn:
- `confirmation-window-size`. This optional parameter determines the - `confirmation-window-size`. This optional parameter determines the
`confirmation-window-size` to use for the connection used to forward messages `confirmation-window-size` to use for the connection used to forward messages
to the target node. This attribute is described in section [Reconnection and to the target node. This attribute is described in section
Session Reattachment](client-reconnection.md) [Client failover attributes](client-failover.md#client-failover-attributes)
> **Warning** > **Warning**
> >

View File

@ -957,24 +957,7 @@ transactions are there for the client when it reconnects. The normal
reconnect settings apply when the client is reconnecting so these should reconnect settings apply when the client is reconnecting so these should
be high enough to deal with the time needed to scale down. be high enough to deal with the time needed to scale down.
## Failover Modes ## Client Failover
Apache ActiveMQ Artemis defines two types of client failover:
- Automatic client failover
- Application-level client failover
Apache ActiveMQ Artemis also provides 100% transparent automatic reattachment of
connections to the same server (e.g. in case of transient network
problems). This is similar to failover, except it is reconnecting to the
same server and is discussed in [Client Reconnection and Session Reattachment](client-reconnection.md)
During failover, if the client has consumers on any non persistent or
temporary queues, those queues will be automatically recreated on the backup node,
since the backup node will not have any knowledge of non persistent queues.
### Automatic Client Failover
Apache ActiveMQ Artemis clients can be configured to receive knowledge of all live and Apache ActiveMQ Artemis clients can be configured to receive knowledge of all live and
backup servers, so that in event of connection failure at the client - backup servers, so that in event of connection failure at the client -
@ -982,45 +965,7 @@ live server connection, the client will detect this and reconnect to the
backup server. The backup server will then automatically recreate any backup server. The backup server will then automatically recreate any
sessions and consumers that existed on each connection before failover, sessions and consumers that existed on each connection before failover,
thus saving the user from having to hand-code manual reconnection logic. thus saving the user from having to hand-code manual reconnection logic.
For further details see [Client Failover](client-failover.md)
Apache ActiveMQ Artemis clients detect connection failure when it has not received
packets from the server within the time given by
`client-failure-check-period` as explained in section [Detecting Dead Connections](connection-ttl.md). If the client
does not receive data in good time, it will assume the connection has
failed and attempt failover. Also if the socket is closed by the OS,
usually if the server process is killed rather than the machine itself
crashing, then the client will failover straight away.
Apache ActiveMQ Artemis clients can be configured to discover the list of live-backup
server groups in a number of different ways. They can be configured
explicitly or probably the most common way of doing this is to use
*server discovery* for the client to automatically discover the list.
For full details on how to configure server discovery, please see [Clusters](clusters.md).
Alternatively, the clients can explicitly connect to a specific server
and download the current servers and backups see [Clusters](clusters.md).
To enable automatic client failover, the client must be configured to
allow non-zero reconnection attempts (as explained in [Client Reconnection and Session Reattachment](client-reconnection.md)).
By default failover will only occur after at least one connection has
been made to the live server. In other words, by default, failover will
not occur if the client fails to make an initial connection to the live
server - in this case it will simply retry connecting to the live server
according to the reconnect-attempts property and fail after this number
of attempts.
#### Failing over on the Initial Connection
Since the client does not learn about the full topology until after the
first connection is made there is a window where it does not know about
the backup. If a failure happens at this point the client can only try
reconnecting to the original live server. To configure how many attempts
the client will make you can set the URL parameter `initialConnectAttempts`.
The default for this is `0`, that is try only once. Once the number of
attempts has been made an exception will be thrown.
For examples of automatic failover with transacted and non-transacted
JMS sessions, please see [the examples](examples.md) chapter.
#### A Note on Server Replication #### A Note on Server Replication

View File

@ -112,7 +112,7 @@ successfully reached the server.
The window size for send acknowledgements is determined by the The window size for send acknowledgements is determined by the
confirmation-window-size parameter on the connection factory or client confirmation-window-size parameter on the connection factory or client
session factory. Please see [Client Reconnection and Session Reattachment](client-reconnection.md) for more info on this. session factory. Please see [Client Failover](client-failover.md) for more info on this.
To use the feature using the core API, you implement the interface To use the feature using the core API, you implement the interface
`org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler` and set `org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler` and set

View File

@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
@ -36,6 +37,7 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection; import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQSession; import org.apache.activemq.artemis.jms.client.ActiveMQSession;
@ -98,6 +100,7 @@ public class ClientConnectorFailoverTest extends StaticClusterWithBackupFailover
} }
crashAndWaitForFailure(getServer(serverIdBeforeCrash), clientSession); crashAndWaitForFailure(getServer(serverIdBeforeCrash), clientSession);
Assert.assertEquals(backupConnector.getName(), sessionFactory.getConnectorConfiguration().getName()); Assert.assertEquals(backupConnector.getName(), sessionFactory.getConnectorConfiguration().getName());
Assert.assertEquals(TEST_PARAM, sessionFactory.getConnectorConfiguration().getExtraParams().get(TEST_PARAM)); Assert.assertEquals(TEST_PARAM, sessionFactory.getConnectorConfiguration().getExtraParams().get(TEST_PARAM));
@ -119,6 +122,188 @@ public class ClientConnectorFailoverTest extends StaticClusterWithBackupFailover
} }
} }
@Test
public void testConsumerAfterFailoverWithRedistribution() throws Exception {
setupCluster();
AddressSettings testAddressSettings = new AddressSettings().setRedistributionDelay(0);
for (int i : getServerIDs()) {
getServer(i).getAddressSettingsRepository().addMatch(QUEUES_TESTADDRESS, testAddressSettings);
}
startServers(getLiveServerIDs());
startServers(getBackupServerIDs());
for (int i : getLiveServerIDs()) {
waitForTopology(servers[i], 3, 3);
}
for (int i : getBackupServerIDs()) {
waitForFailoverTopology(i, 0, 1, 2);
}
for (int i : getLiveServerIDs()) {
setupSessionFactory(i, i + 3, isNetty(), false);
createQueue(i, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
}
List<TransportConfiguration> transportConfigList = new ArrayList<>();
for (int i : getLiveServerIDs()) {
Map<String, Object> params = generateParams(i, isNetty());
TransportConfiguration serverToTC = createTransportConfiguration("node" + i, isNetty(), false, params);
serverToTC.getExtraParams().put(TEST_PARAM, TEST_PARAM);
transportConfigList.add(serverToTC);
}
TransportConfiguration[] transportConfigs = transportConfigList.toArray(new TransportConfiguration[transportConfigList.size()]);
try (ServerLocator serverLocator = new ServerLocatorImpl(false, transportConfigs)) {
serverLocator.setFailoverAttempts(3);
serverLocator.setReconnectAttempts(0);
serverLocator.setUseTopologyForLoadBalancing(false);
try (ClientSessionFactory sessionFactory = serverLocator.createSessionFactory()) {
try (ClientSession clientSession = sessionFactory.createSession()) {
clientSession.start();
int serverIdBeforeCrash = Integer.parseInt(sessionFactory.
getConnectorConfiguration().getName().substring(4));
QueueControl testQueueControlBeforeCrash = (QueueControl)getServer(serverIdBeforeCrash).
getManagementService().getResource(ResourceNames.QUEUE + QUEUE_NAME);
Assert.assertEquals(0, testQueueControlBeforeCrash.getMessageCount());
try (ClientProducer clientProducer = clientSession.createProducer(QUEUES_TESTADDRESS)) {
clientProducer.send(clientSession.createMessage(true));
clientProducer.send(clientSession.createMessage(true));
}
Assert.assertEquals(2, testQueueControlBeforeCrash.getMessageCount());
try (ClientConsumer clientConsumer = clientSession.createConsumer(QUEUE_NAME)) {
ClientMessage messageBeforeCrash = clientConsumer.receive(3000);
Assert.assertNotNull(messageBeforeCrash);
messageBeforeCrash.acknowledge();
clientSession.commit();
Assert.assertEquals(1, testQueueControlBeforeCrash.getMessageCount());
crashAndWaitForFailure(getServer(serverIdBeforeCrash), clientSession);
Assert.assertEquals(TEST_PARAM, sessionFactory.getConnectorConfiguration().getExtraParams().get(TEST_PARAM));
int serverIdAfterCrash = Integer.parseInt(sessionFactory.
getConnectorConfiguration().getName().substring(4));
Assert.assertNotEquals(serverIdBeforeCrash, serverIdAfterCrash);
Assert.assertTrue(isLiveServerID(serverIdAfterCrash));
QueueControl testQueueControlAfterCrash = (QueueControl)getServer(serverIdAfterCrash).
getManagementService().getResource(ResourceNames.QUEUE + QUEUE_NAME);
Wait.waitFor(() -> testQueueControlAfterCrash.getMessageCount() == 1, 3000);
Assert.assertNotNull(clientConsumer.receive());
}
clientSession.stop();
}
}
}
}
@Test
public void testAutoCreatedQueueAfterFailoverWithoutHA() throws Exception {
setupCluster();
startServers(getLiveServerIDs());
for (int i : getLiveServerIDs()) {
waitForTopology(servers[i], 3, 0);
}
for (int i : getLiveServerIDs()) {
setupSessionFactory(i, i + 3, isNetty(), false);
}
List<TransportConfiguration> transportConfigList = new ArrayList<>();
for (int i : getLiveServerIDs()) {
Map<String, Object> params = generateParams(i, isNetty());
TransportConfiguration serverToTC = createTransportConfiguration("node" + i, isNetty(), false, params);
serverToTC.getExtraParams().put(TEST_PARAM, TEST_PARAM);
transportConfigList.add(serverToTC);
}
TransportConfiguration[] transportConfigs = transportConfigList.toArray(new TransportConfiguration[transportConfigList.size()]);
try (ServerLocator serverLocator = new ServerLocatorImpl(false, transportConfigs)) {
serverLocator.setFailoverAttempts(3);
serverLocator.setReconnectAttempts(0);
serverLocator.setUseTopologyForLoadBalancing(false);
try (ClientSessionFactory sessionFactory = serverLocator.createSessionFactory()) {
try (ClientSession clientSession = sessionFactory.createSession()) {
clientSession.start();
TransportConfiguration backupConnector = (TransportConfiguration) ((ClientSessionFactoryImpl) sessionFactory).getBackupConnector();
Assert.assertNull(backupConnector);
int serverIdBeforeCrash = Integer.parseInt(sessionFactory.getConnectorConfiguration().getName().substring(4));
createQueue(serverIdBeforeCrash, QUEUES_TESTADDRESS, QUEUE_NAME, null, false);
QueueControl testQueueControlBeforeCrash = (QueueControl) getServer(serverIdBeforeCrash).getManagementService().getResource(ResourceNames.QUEUE + QUEUE_NAME);
Assert.assertEquals(0, testQueueControlBeforeCrash.getMessageCount());
for (int i : getLiveServerIDs()) {
if (i != serverIdBeforeCrash) {
Assert.assertNull(getServer(i).getManagementService().getResource(ResourceNames.QUEUE + QUEUE_NAME));
}
}
try (ClientConsumer clientConsumer = clientSession.createConsumer(QUEUE_NAME)) {
try (ClientProducer clientProducer = clientSession.createProducer(QUEUES_TESTADDRESS)) {
clientProducer.send(clientSession.createMessage(true));
}
Wait.waitFor(() -> testQueueControlBeforeCrash.getMessageCount() == 1, 3000);
Assert.assertNotNull(clientConsumer.receive(3000));
crashAndWaitForFailure(getServer(serverIdBeforeCrash), clientSession);
Assert.assertEquals(TEST_PARAM, sessionFactory.getConnectorConfiguration().getExtraParams().get(TEST_PARAM));
int serverIdAfterCrash = Integer.parseInt(sessionFactory.getConnectorConfiguration().getName().substring(4));
Assert.assertNotEquals(serverIdBeforeCrash, serverIdAfterCrash);
boolean serverIdAfterCrashFound = false;
for (int i : getLiveServerIDs()) {
if (i == serverIdAfterCrash) {
serverIdAfterCrashFound = true;
}
}
Assert.assertTrue(serverIdAfterCrashFound);
QueueControl testQueueControlAfterCrash = (QueueControl) getServer(serverIdAfterCrash).getManagementService().getResource(ResourceNames.QUEUE + QUEUE_NAME);
Assert.assertNotNull(testQueueControlAfterCrash);
Assert.assertEquals(0, testQueueControlAfterCrash.getMessageCount());
try (ClientProducer clientProducer = clientSession.createProducer(QUEUES_TESTADDRESS)) {
clientProducer.send(clientSession.createMessage(true));
Wait.waitFor(() -> testQueueControlAfterCrash.getMessageCount() == 1, 3000);
Assert.assertEquals(1, testQueueControlAfterCrash.getMessageCount());
Assert.assertNotNull(clientConsumer.receive(3000));
}
clientSession.stop();
}
}
}
}
}
@Test @Test
public void testJMSConsumerAfterFailover() throws Exception { public void testJMSConsumerAfterFailover() throws Exception {

View File

@ -92,6 +92,7 @@ import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.concurrent.atomic.AtomicInteger;
public class FailoverTest extends FailoverTestBase { public class FailoverTest extends FailoverTestBase {
@ -1991,36 +1992,32 @@ public class FailoverTest extends FailoverTestBase {
sf = createSessionFactoryAndWaitForTopology(locator, 2); sf = createSessionFactoryAndWaitForTopology(locator, 2);
final AtomicBoolean channelLockedDuringFailover = new AtomicBoolean(false); final int reconnectFailures = 3;
final AtomicInteger reconnectRetries = new AtomicInteger(0);
final AtomicBoolean channelLockedDuringFailover = new AtomicBoolean(true);
ClientSession session = createSession(sf, true, true, 0); ClientSession session = createSession(sf, true, true, 0);
backupServer.addInterceptor( backupServer.addInterceptor((packet, connection) -> {
new Interceptor() { if (packet.getType() == PacketImpl.CREATESESSION) {
private int index = 0; if (reconnectRetries.getAndIncrement() < reconnectFailures) {
Channel sessionChannel = ((RemotingConnectionImpl)connection).getChannel(ChannelImpl.CHANNEL_ID.SESSION.id, -1);
@Override sessionChannel.send(new ActiveMQExceptionMessage(new ActiveMQInternalErrorException()));
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { return false;
if (index < 1 && packet.getType() == PacketImpl.CREATESESSION) {
sf.getConnection().addCloseListener(() -> {
index++;
ActiveMQSessionContext sessionContext = (ActiveMQSessionContext)((ClientSessionInternal)session).getSessionContext();
channelLockedDuringFailover.set(sessionContext.getSessionChannel().isLocked());
});
Channel sessionChannel = ((RemotingConnectionImpl)connection).getChannel(ChannelImpl.CHANNEL_ID.SESSION.id, -1);
sessionChannel.send(new ActiveMQExceptionMessage(new ActiveMQInternalErrorException()));
return false;
}
return true;
} }
});
ActiveMQSessionContext sessionContext = (ActiveMQSessionContext)((ClientSessionInternal)session).getSessionContext();
channelLockedDuringFailover.compareAndSet(true, sessionContext.getSessionChannel().isLocked());
}
return true;
});
session.start(); session.start();
crash(session); crash(session);
Assert.assertTrue(channelLockedDuringFailover.get()); Assert.assertTrue(channelLockedDuringFailover.get());
Assert.assertEquals(reconnectFailures + 1, reconnectRetries.get());
} }
@Test(timeout = 120000) @Test(timeout = 120000)

View File

@ -28,10 +28,28 @@ public class StaticClusterWithBackupFailoverTest extends ClusterWithBackupFailov
return new int[]{0, 1, 2}; return new int[]{0, 1, 2};
} }
protected boolean isLiveServerID(int id) {
for (int i : getLiveServerIDs()) {
if (i == id) {
return true;
}
}
return false;
}
protected int[] getBackupServerIDs() { protected int[] getBackupServerIDs() {
return new int[]{3, 4, 5}; return new int[]{3, 4, 5};
} }
protected boolean isBackupServerID(int id) {
for (int i : getBackupServerIDs()) {
if (i == id) {
return true;
}
}
return false;
}
@Override @Override
protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception { protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
setupClusterConnectionWithBackups("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, new int[]{1, 2}); setupClusterConnectionWithBackups("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, new int[]{1, 2});