https://issues.apache.org/jira/browse/AMQ-3542 - Using failover: with static discovery in a network connector to choose from a master/slave tuple leads to hangs and invalid states. issue with demand forward bridge reacting to failover transport interupt/resume leading to race conditions. race condition with tracking bridges and restarts. change default maxReconnectAttempts=0 to mean none, -1 for infinte. Default behavour is still infinite. To reference a master slave pair, use: static:(failover:(a,b)?maxReconnectAttempts=0)?useExponentialBackOff=false. see org.apache.activemq.network.FailoverStaticNetworkTest

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1183062 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-10-13 20:07:48 +00:00
parent 8a3bdd9f71
commit 4acd13243a
9 changed files with 205 additions and 126 deletions

View File

@ -116,7 +116,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected CountDownLatch localStartedLatch = new CountDownLatch(1);
protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1);
protected CountDownLatch localBrokerIdKnownLatch = new CountDownLatch(1);
protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
protected NetworkBridgeConfiguration configuration;
protected NetworkBridgeFilterFactory filterFactory;
@ -163,7 +162,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
serviceLocalException(error);
}
});
remoteBroker.setTransportListener(new TransportListener() {
remoteBroker.setTransportListener(new DefaultTransportListener() {
public void onCommand(Object o) {
Command command = (Command) o;
@ -174,55 +173,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
serviceRemoteException(error);
}
public void transportInterupted() {
// clear any subscriptions - to try and prevent the bridge
// from stalling the broker
if (remoteInterupted.compareAndSet(false, true)) {
LOG.info("Outbound transport to " + remoteBrokerName + " interrupted.");
if (localBridgeStarted.get()) {
clearDownSubscriptions();
synchronized (DemandForwardingBridgeSupport.this) {
try {
localBroker.oneway(localConnectionInfo.createRemoveCommand());
} catch (TransportDisposedIOException td) {
LOG.debug("local broker is now disposed", td);
} catch (IOException e) {
LOG.warn("Caught exception from local start", e);
}
}
}
localBridgeStarted.set(false);
remoteBridgeStarted.set(false);
startedLatch = new CountDownLatch(2);
localStartedLatch = new CountDownLatch(1);
}
}
public void transportResumed() {
if (remoteInterupted.compareAndSet(true, false)) {
// We want to slow down false connects so that we don't
// get in a busy loop.
// False connects can occurr if you using SSH tunnels.
if (!lastConnectSucceeded.get()) {
try {
LOG.debug("Previous connection was never fully established. Sleeping for second to avoid busy loop.");
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
lastConnectSucceeded.set(false);
try {
startLocalBridge();
remoteBridgeStarted.set(true);
startedLatch.countDown();
LOG.info("Outbound transport to " + remoteBrokerName + " resumed");
} catch (Throwable e) {
LOG.error("Caught exception from local start in resume transport", e);
serviceLocalException(e);
}
}
}
});
localBroker.start();
@ -260,7 +210,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
asyncTaskRunner.execute(new Runnable() {
public void run() {
final String originalName = Thread.currentThread().getName();
Thread.currentThread().setName("StartRemotelBridge: localBroker=" + localBroker);
Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker);
try {
startRemoteBridge();
} catch (Exception e) {
@ -782,14 +732,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
serviceLocalBrokerInfo(command);
} else if (command.isShutdownInfo()) {
LOG.info(configuration.getBrokerName() + " Shutting down");
// Don't shut down the whole connector if the remote side
// was interrupted.
// the local transport is just shutting down temporarily
// until the remote side
// is restored.
if (!remoteInterupted.get()) {
stop();
}
stop();
} else if (command.getClass() == ConnectionError.class) {
ConnectionError ce = (ConnectionError) command;
serviceLocalException(ce.getException());

View File

@ -90,9 +90,11 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
return;
}
// Should we try to connect to that URI?
if( bridges.containsKey(uri) ) {
LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: "+uri );
return;
synchronized (bridges) {
if( bridges.containsKey(uri) ) {
LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: "+uri );
return;
}
}
if ( localURI.equals(uri) || (connectionFilter != null && !connectionFilter.connectTo(uri))) {
LOG.debug("not connecting loopback: " + uri);
@ -132,7 +134,9 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
NetworkBridge bridge = createBridge(localTransport, remoteTransport, event);
try {
bridge.start();
bridges.put(uri, bridge);
synchronized (bridges) {
bridges.put(uri, bridge);
}
} catch (Exception e) {
ServiceSupport.dispose(localTransport);
ServiceSupport.dispose(remoteTransport);
@ -158,12 +162,13 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
return;
}
NetworkBridge bridge = bridges.remove(uri);
if (bridge == null) {
return;
NetworkBridge bridge;
synchronized (bridges) {
bridge = bridges.remove(uri);
}
if (bridge != null) {
ServiceSupport.dispose(bridge);
}
ServiceSupport.dispose(bridge);
}
}

View File

@ -58,6 +58,10 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent {
super(service);
}
@Override
public String toString() {
return "[" + serviceName + ", failed:" + failed + ", connectionFailures:" + connectFailures + "]";
}
}
public void setDiscoveryListener(DiscoveryListener listener) {
@ -118,7 +122,7 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent {
event.connectFailures++;
if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
LOG.warn("Reconnect attempts exceeded "+maxReconnectAttempts+" tries. Reconnecting has been disabled.");
LOG.warn("Reconnect attempts exceeded "+maxReconnectAttempts+" tries. Reconnecting has been disabled for: " + event);
return;
}

View File

@ -67,6 +67,7 @@ public class FailoverTransport implements CompositeTransport {
private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class);
private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10;
private static final int INFINITE = -1;
private TransportListener transportListener;
private boolean disposed;
private boolean connected;
@ -89,11 +90,11 @@ public class FailoverTransport implements CompositeTransport {
private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
private long maxReconnectDelay = 1000 * 30;
private double backOffMultiplier = 2d;
private long timeout = -1;
private long timeout = INFINITE;
private boolean useExponentialBackOff = true;
private boolean randomize = true;
private int maxReconnectAttempts;
private int startupMaxReconnectAttempts;
private int maxReconnectAttempts = INFINITE;
private int startupMaxReconnectAttempts = INFINITE;
private int connectFailures;
private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
private Exception connectionFailure;
@ -107,8 +108,6 @@ public class FailoverTransport implements CompositeTransport {
private int maxCacheSize = 128 * 1024;
private final TransportListener disposedListener = new DefaultTransportListener() {
};
//private boolean connectionInterruptProcessingComplete;
private final TransportListener myTransportListener = createTransportListener();
private boolean updateURIsSupported = true;
private boolean reconnectSupported = true;
@ -222,12 +221,12 @@ public class FailoverTransport implements CompositeTransport {
boolean reconnectOk = false;
synchronized (reconnectMutex) {
if (started) {
LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed to " + connectedTransportURI
+ " , attempting to automatically reconnect due to: " + e);
LOG.debug("Transport failed with the following exception:", e);
if (canReconnect()) {
reconnectOk = true;
}
LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed, reason: " + e
+ (reconnectOk ? "," : ", not") +" attempting to automatically reconnect");
initialized = false;
failedConnectTransportURI = connectedTransportURI;
connectedTransportURI = null;
@ -240,11 +239,17 @@ public class FailoverTransport implements CompositeTransport {
if (reconnectOk) {
reconnectTask.wakeup();
} else {
propagateFailureToExceptionListener(e);
}
}
}
}
private boolean canReconnect() {
return started && 0 != calculateReconnectAttemptLimit();
}
public final void handleConnectionControl(ConnectionControl control) {
String reconnectStr = control.getReconnectTo();
if (reconnectStr != null) {
@ -292,7 +297,9 @@ public class FailoverTransport implements CompositeTransport {
public void start() throws Exception {
synchronized (reconnectMutex) {
LOG.debug("Started.");
if (LOG.isDebugEnabled()) {
LOG.debug("Started " + this);
}
if (started) {
return;
}
@ -311,7 +318,9 @@ public class FailoverTransport implements CompositeTransport {
public void stop() throws Exception {
Transport transportToStop = null;
synchronized (reconnectMutex) {
LOG.debug("Stopped.");
if (LOG.isDebugEnabled()) {
LOG.debug("Stopped " + this);
}
if (!started) {
return;
}
@ -825,9 +834,7 @@ public class FailoverTransport implements CompositeTransport {
doRebalance = false;
}
if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
reconnectDelay = initialReconnectDelay;
}
resetReconnectDelay();
Transport transport = null;
URI uri = null;
@ -845,7 +852,9 @@ public class FailoverTransport implements CompositeTransport {
// for the first time, or we were disposed for some reason.
if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) {
synchronized (sleepMutex) {
LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
if (LOG.isDebugEnabled()) {
LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
}
try {
sleepMutex.wait(reconnectDelay);
} catch (InterruptedException e) {
@ -868,16 +877,18 @@ public class FailoverTransport implements CompositeTransport {
}
if (LOG.isDebugEnabled()) {
LOG.debug("Attempting connect to: " + uri);
LOG.debug("Attempting " + connectFailures + "th connect to: " + uri);
}
transport.setTransportListener(myTransportListener);
transport.start();
if (started) {
if (started && !firstConnection) {
restoreTransport(transport);
}
LOG.debug("Connection established");
if (LOG.isDebugEnabled()) {
LOG.debug("Connection established");
}
reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri;
connectedTransport.set(transport);
@ -899,7 +910,9 @@ public class FailoverTransport implements CompositeTransport {
if (transportListener != null) {
transportListener.transportResumed();
} else {
LOG.debug("transport resumed by transport listener not set");
if (LOG.isDebugEnabled()) {
LOG.debug("transport resumed by transport listener not set");
}
}
if (firstConnection) {
@ -934,19 +947,10 @@ public class FailoverTransport implements CompositeTransport {
}
}
int reconnectAttempts = 0;
if (firstConnection) {
if (this.startupMaxReconnectAttempts != 0) {
reconnectAttempts = this.startupMaxReconnectAttempts;
}
}
int reconnectLimit = calculateReconnectAttemptLimit();
if (reconnectAttempts == 0) {
reconnectAttempts = this.maxReconnectAttempts;
}
if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) {
LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
if (reconnectLimit != INFINITE && ++connectFailures >= reconnectLimit) {
LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)");
connectionFailure = failure;
// Make sure on initial startup, that the transportListener has been
@ -960,14 +964,7 @@ public class FailoverTransport implements CompositeTransport {
}
}
if (transportListener != null) {
if (connectionFailure instanceof IOException) {
transportListener.onException((IOException) connectionFailure);
} else {
transportListener.onException(IOExceptionSupport.create(connectionFailure));
}
}
reconnectMutex.notifyAll();
propagateFailureToExceptionListener(connectionFailure);
return false;
}
}
@ -976,7 +973,9 @@ public class FailoverTransport implements CompositeTransport {
if (reconnectDelay > 0) {
synchronized (sleepMutex) {
LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
if (LOG.isDebugEnabled()) {
LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection");
}
try {
sleepMutex.wait(reconnectDelay);
} catch (InterruptedException e) {
@ -997,6 +996,34 @@ public class FailoverTransport implements CompositeTransport {
return !disposed;
}
private void resetReconnectDelay() {
if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
reconnectDelay = initialReconnectDelay;
}
}
/*
* called with reconnectMutex held
*/
private void propagateFailureToExceptionListener(Exception exception) {
if (transportListener != null) {
if (exception instanceof IOException) {
transportListener.onException((IOException)exception);
} else {
transportListener.onException(IOExceptionSupport.create(exception));
}
}
reconnectMutex.notifyAll();
}
private int calculateReconnectAttemptLimit() {
int maxReconnectValue = this.maxReconnectAttempts;
if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) {
maxReconnectValue = this.startupMaxReconnectAttempts;
}
return maxReconnectValue;
}
final boolean buildBackups() {
synchronized (backupMutex) {
if (!disposed && backup && backups.size() < backupPoolSize) {

View File

@ -17,12 +17,15 @@
package org.apache.activemq.network;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
@ -31,6 +34,7 @@ import static org.junit.Assert.assertTrue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.management.ObjectName;
@ -42,8 +46,10 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.tcp.SslBrokerServiceTest;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -70,12 +76,16 @@ public class FailoverStaticNetworkTest {
protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts,
HashMap<String, String> networkProps) throws Exception {
BrokerService broker = new BrokerService();
//broker.setUseJmx(false);
broker.getManagementContext().setCreateConnector(false);
broker.setSslContext(sslContext);
broker.setDeleteAllMessagesOnStartup(true);
broker.setBrokerName("Broker_" + listenPort);
broker.addConnector(scheme + "://localhost:" + listenPort);
// lazy init listener on broker start
TransportConnector transportConnector = new TransportConnector();
transportConnector.setUri(new URI(scheme + "://localhost:" + listenPort));
List<TransportConnector> transportConnectors = new ArrayList<TransportConnector>();
transportConnectors.add(transportConnector);
broker.setTransportConnectors(transportConnectors);
if (networkToPorts != null && networkToPorts.length > 0) {
StringBuilder builder = new StringBuilder("static:(failover:(" + scheme + "://localhost:");
builder.append(networkToPorts[0]);
@ -84,7 +94,7 @@ public class FailoverStaticNetworkTest {
}
// limit the reconnects in case of initial random connection to slave
// leaving randomize on verifies that this config is picked up
builder.append(")?maxReconnectAttempts=1)");
builder.append(")?maxReconnectAttempts=0)?useExponentialBackOff=false");
NetworkConnector nc = broker.addNetworkConnector(builder.toString());
if (networkProps != null) {
IntrospectionSupport.setProperties(nc, networkProps);
@ -309,11 +319,89 @@ public class FailoverStaticNetworkTest {
doTestNetworkSendReceive();
}
@Test
public void testRepeatedSendReceiveWithMasterSlaveAlternate() throws Exception {
brokerB = createBroker("tcp", "62617", new String[]{"61610","61611"});
brokerB.start();
final AtomicBoolean done = new AtomicBoolean(false);
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(new Runnable() {
@Override
public void run() {
try {
while (!done.get()) {
brokerA = createBroker("tcp", "61610", null);
brokerA.setBrokerName("Pair");
brokerA.setBrokerObjectName(new ObjectName(brokerA.getManagementContext().getJmxDomainName() + ":" + "BrokerName="
+ JMXSupport.encodeObjectNamePart("A") + "," + "Type=Broker"));
((KahaDBPersistenceAdapter)brokerA.getPersistenceAdapter()).setDatabaseLockedWaitDelay(1000);
brokerA.start();
brokerA.waitUntilStopped();
// restart after peer taken over
brokerA1.waitUntilStarted();
}
} catch (Exception ignored) {
LOG.info("A create/start, unexpected: " + ignored, ignored);
}
}
});
// start with brokerA as master
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerA != null && brokerA.waitUntilStarted();
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
try {
while (!done.get()) {
brokerA1 = createBroker("tcp", "61611", null);
brokerA1.setBrokerName("Pair");
// so they can coexist in local jmx we set the object name b/c the brokername identifies the shared store
brokerA1.setBrokerObjectName(new ObjectName(brokerA.getManagementContext().getJmxDomainName() + ":" + "BrokerName="
+ JMXSupport.encodeObjectNamePart("A1") + "," + "Type=Broker"));
((KahaDBPersistenceAdapter)brokerA1.getPersistenceAdapter()).setDatabaseLockedWaitDelay(1000);
brokerA1.start();
brokerA1.waitUntilStopped();
// restart after peer taken over
brokerA.waitUntilStarted();
}
} catch (Exception ignored) {
LOG.info("A1 create/start, unexpected: " + ignored, ignored);
}
}
});
for (int i=0; i<10; i++) {
BrokerService currentMaster = (i%2 == 0 ? brokerA : brokerA1);
LOG.info("iteration: " + i + ", using: " + currentMaster.getBrokerObjectName().getKeyProperty("BrokerName"));
currentMaster.waitUntilStarted();
doTestNetworkSendReceive(brokerB, currentMaster);
LOG.info("Stopping " + currentMaster.getBrokerObjectName().getKeyProperty("BrokerName"));
currentMaster.stop();
currentMaster.waitUntilStopped();
}
done.set(false);
LOG.info("all done");
executorService.shutdownNow();
}
private void doTestNetworkSendReceive() throws Exception, JMSException {
doTestNetworkSendReceive(brokerB, brokerA);
}
private void doTestNetworkSendReceive(BrokerService to, BrokerService from) throws Exception, JMSException {
private void doTestNetworkSendReceive(final BrokerService to, final BrokerService from) throws Exception, JMSException {
LOG.info("Creating Consumer on the networked broker ..." + from);
@ -332,7 +420,9 @@ public class FailoverStaticNetworkTest {
boolean gotMessage = Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return consumer.receive(1000) != null;
Message message = consumer.receive(5000);
LOG.info("from: " + from.getBrokerObjectName().getKeyProperty("BrokerName") + ", received: " + message);
return message != null;
}
});
try {

View File

@ -54,7 +54,7 @@ public class DiscoveryUriTest extends EmbeddedBrokerTestSupport {
public void testFailedConnect() throws Exception {
try {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("discovery:(multicast://default?group=test1)?reconnectDelay=1000&maxReconnectAttempts=3&useExponentialBackOff=false");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("discovery:(multicast://default?group=test1)?reconnectDelay=1000&startupMaxReconnectAttempts=3&useExponentialBackOff=false");
Connection conn = factory.createConnection();
conn.start();
} catch (Exception e) {

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.failover;
import java.io.IOException;
import java.net.*;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.net.ServerSocketFactory;
@ -30,17 +31,20 @@ import org.apache.activemq.util.Wait;
public class SlowConnectionTest extends TestCase {
private CountDownLatch socketReadyLatch = new CountDownLatch(1);
public void testSlowConnection() throws Exception {
int timeout = 1000;
URI tcpUri = new URI("tcp://localhost:61616?soTimeout=" + timeout + "&trace=true&connectionTimeout=" + timeout + "&wireFormat.maxInactivityDurationInitalDelay=" + timeout);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")");
final Connection connection = cf.createConnection();
MockBroker broker = new MockBroker();
broker.start();
socketReadyLatch.await();
int timeout = 1000;
URI tcpUri = new URI("tcp://localhost:" + broker.ss.getLocalPort() + "?soTimeout=" + timeout + "&trace=true&connectionTimeout=" + timeout + "&wireFormat.maxInactivityDurationInitalDelay=" + timeout);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")");
final Connection connection = cf.createConnection();
new Thread(new Runnable() {
public void run() {
try { connection.start(); } catch (Throwable ignored) {}
@ -62,19 +66,25 @@ public class SlowConnectionTest extends TestCase {
}
class MockBroker extends Thread {
ServerSocket ss = null;
public MockBroker() {
super("MockBroker");
}
public void run() {
List<Socket> inProgress = new ArrayList<Socket>();
ServerSocketFactory factory = ServerSocketFactory.getDefault();
ServerSocket ss = null;
try {
ss = factory.createServerSocket(61616);
ss = factory.createServerSocket(0);
ss.setSoTimeout(5000);
socketReadyLatch.countDown();
while (!interrupted()) {
inProgress.add(ss.accept()); // eat socket
}
} catch (java.net.SocketTimeoutException expected) {
} catch (Exception e) {
e.printStackTrace();
} finally {

View File

@ -320,7 +320,7 @@ public class DurableSubProcessTest extends org.apache.activemq.TestSupport {
"jms.alwaysSyncSend=true&jms.dispatchAsync=true&" +
"jms.producerWindowSize=20971520&" +
"jms.copyMessageOnSend=false&" +
"initialReconnectDelay=100&maxReconnectDelay=30000&maxReconnectAttempts=0&" +
"initialReconnectDelay=100&maxReconnectDelay=30000&" +
"useExponentialBackOff=true";
final ConnectionFactory cf = new ActiveMQConnectionFactory(url);

View File

@ -398,7 +398,7 @@ public class DurableSubProcessWithRestartTest {
+ "jms.alwaysSyncSend=true&jms.dispatchAsync=true&"
+ "jms.producerWindowSize=20971520&"
+ "jms.copyMessageOnSend=false&"
+ "initialReconnectDelay=100&maxReconnectDelay=30000&maxReconnectAttempts=0&"
+ "initialReconnectDelay=100&maxReconnectDelay=30000&"
+ "useExponentialBackOff=true";
final ConnectionFactory cf = new ActiveMQConnectionFactory(url);