mirror of https://github.com/apache/activemq.git
fix for incorrect logic in reconnect attempt limits and also fix a case where an NPE could occur (saw in the tests). Based on patch provided by Benoit Wiart
This commit is contained in:
parent
91d48280b8
commit
1b493749a1
|
@ -16,6 +16,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.network.jms;
|
package org.apache.activemq.network.jms;
|
||||||
|
|
||||||
|
import static org.apache.activemq.network.jms.ReconnectionPolicy.INFINITE;
|
||||||
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -71,7 +73,7 @@ public abstract class JmsConnector implements Service {
|
||||||
protected LRUCache<Destination, DestinationBridge> replyToBridges = createLRUCache();
|
protected LRUCache<Destination, DestinationBridge> replyToBridges = createLRUCache();
|
||||||
|
|
||||||
private ReconnectionPolicy policy = new ReconnectionPolicy();
|
private ReconnectionPolicy policy = new ReconnectionPolicy();
|
||||||
protected ThreadPoolExecutor connectionSerivce;
|
protected ThreadPoolExecutor connectionService;
|
||||||
private final List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
|
private final List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
|
||||||
private final List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
|
private final List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
|
||||||
private String name;
|
private String name;
|
||||||
|
@ -116,7 +118,7 @@ public abstract class JmsConnector implements Service {
|
||||||
}
|
}
|
||||||
replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize());
|
replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize());
|
||||||
|
|
||||||
connectionSerivce = createExecutor();
|
connectionService = createExecutor();
|
||||||
|
|
||||||
// Subclasses can override this to customize their own it.
|
// Subclasses can override this to customize their own it.
|
||||||
result = doConnectorInit();
|
result = doConnectorInit();
|
||||||
|
@ -169,8 +171,8 @@ public abstract class JmsConnector implements Service {
|
||||||
public void stop() throws Exception {
|
public void stop() throws Exception {
|
||||||
if (started.compareAndSet(true, false)) {
|
if (started.compareAndSet(true, false)) {
|
||||||
|
|
||||||
ThreadPoolUtils.shutdown(connectionSerivce);
|
ThreadPoolUtils.shutdown(connectionService);
|
||||||
connectionSerivce = null;
|
connectionService = null;
|
||||||
|
|
||||||
if (foreignConnection.get() != null) {
|
if (foreignConnection.get() != null) {
|
||||||
try {
|
try {
|
||||||
|
@ -509,7 +511,7 @@ public abstract class JmsConnector implements Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
// We got here first and cleared the connection, now we queue a reconnect.
|
// We got here first and cleared the connection, now we queue a reconnect.
|
||||||
this.connectionSerivce.execute(new Runnable() {
|
this.connectionService.execute(new Runnable() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -534,7 +536,7 @@ public abstract class JmsConnector implements Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
// We got here first and cleared the connection, now we queue a reconnect.
|
// We got here first and cleared the connection, now we queue a reconnect.
|
||||||
this.connectionSerivce.execute(new Runnable() {
|
this.connectionService.execute(new Runnable() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -549,7 +551,7 @@ public abstract class JmsConnector implements Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void scheduleAsyncLocalConnectionReconnect() {
|
private void scheduleAsyncLocalConnectionReconnect() {
|
||||||
this.connectionSerivce.execute(new Runnable() {
|
this.connectionService.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
|
@ -562,7 +564,7 @@ public abstract class JmsConnector implements Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void scheduleAsyncForeignConnectionReconnect() {
|
private void scheduleAsyncForeignConnectionReconnect() {
|
||||||
this.connectionSerivce.execute(new Runnable() {
|
this.connectionService.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
|
@ -576,6 +578,7 @@ public abstract class JmsConnector implements Service {
|
||||||
|
|
||||||
private void doInitializeConnection(boolean local) throws Exception {
|
private void doInitializeConnection(boolean local) throws Exception {
|
||||||
|
|
||||||
|
ThreadPoolExecutor connectionService = this.connectionService;
|
||||||
int attempt = 0;
|
int attempt = 0;
|
||||||
|
|
||||||
final int maxRetries;
|
final int maxRetries;
|
||||||
|
@ -587,8 +590,7 @@ public abstract class JmsConnector implements Service {
|
||||||
policy.getMaxReconnectAttempts();
|
policy.getMaxReconnectAttempts();
|
||||||
}
|
}
|
||||||
|
|
||||||
do
|
do {
|
||||||
{
|
|
||||||
if (attempt > 0) {
|
if (attempt > 0) {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(policy.getNextDelay(attempt));
|
Thread.sleep(policy.getNextDelay(attempt));
|
||||||
|
@ -596,7 +598,7 @@ public abstract class JmsConnector implements Service {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (connectionSerivce.isTerminating()) {
|
if (connectionService.isTerminating()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -625,7 +627,7 @@ public abstract class JmsConnector implements Service {
|
||||||
LOG.debug("Failed to establish initial {} connection for JmsConnector [{}]", new Object[]{ (local ? "local" : "foreign"), attempt }, e);
|
LOG.debug("Failed to establish initial {} connection for JmsConnector [{}]", new Object[]{ (local ? "local" : "foreign"), attempt }, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
while (maxRetries < ++attempt && !connectionSerivce.isTerminating());
|
while ((maxRetries == INFINITE || maxRetries > ++attempt) && !connectionService.isShutdown());
|
||||||
|
|
||||||
this.failed.set(true);
|
this.failed.set(true);
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,11 +25,13 @@ package org.apache.activemq.network.jms;
|
||||||
*/
|
*/
|
||||||
public class ReconnectionPolicy {
|
public class ReconnectionPolicy {
|
||||||
|
|
||||||
|
public static final int INFINITE = -1;
|
||||||
|
|
||||||
private int maxSendRetries = 10;
|
private int maxSendRetries = 10;
|
||||||
private long sendRetryDelay = 1000L;
|
private long sendRetryDelay = 1000L;
|
||||||
|
|
||||||
private int maxReconnectAttempts = -1;
|
private int maxReconnectAttempts = INFINITE;
|
||||||
private int maxInitialConnectAttempts = -1;
|
private int maxInitialConnectAttempts = INFINITE;
|
||||||
private long maximumReconnectDelay = 30000;
|
private long maximumReconnectDelay = 30000;
|
||||||
private long initialReconnectDelay = 1000L;
|
private long initialReconnectDelay = 1000L;
|
||||||
private boolean useExponentialBackOff = false;
|
private boolean useExponentialBackOff = false;
|
||||||
|
|
Loading…
Reference in New Issue