diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 9e5ec30851..cb6a2dcd4a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -116,7 +116,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected CountDownLatch localStartedLatch = new CountDownLatch(1); protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1); protected CountDownLatch localBrokerIdKnownLatch = new CountDownLatch(1); - protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false); protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false); protected NetworkBridgeConfiguration configuration; protected NetworkBridgeFilterFactory filterFactory; @@ -163,7 +162,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br serviceLocalException(error); } }); - remoteBroker.setTransportListener(new TransportListener() { + remoteBroker.setTransportListener(new DefaultTransportListener() { public void onCommand(Object o) { Command command = (Command) o; @@ -174,55 +173,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br serviceRemoteException(error); } - public void transportInterupted() { - // clear any subscriptions - to try and prevent the bridge - // from stalling the broker - if (remoteInterupted.compareAndSet(false, true)) { - LOG.info("Outbound transport to " + remoteBrokerName + " interrupted."); - if (localBridgeStarted.get()) { - clearDownSubscriptions(); - synchronized (DemandForwardingBridgeSupport.this) { - try { - localBroker.oneway(localConnectionInfo.createRemoveCommand()); - } catch (TransportDisposedIOException td) { - LOG.debug("local broker is now disposed", td); - } catch (IOException e) { - LOG.warn("Caught exception from local start", e); - } - } - } - localBridgeStarted.set(false); - remoteBridgeStarted.set(false); - startedLatch = new CountDownLatch(2); - localStartedLatch = new CountDownLatch(1); - } - } - - public void transportResumed() { - if (remoteInterupted.compareAndSet(true, false)) { - // We want to slow down false connects so that we don't - // get in a busy loop. - // False connects can occurr if you using SSH tunnels. - if (!lastConnectSucceeded.get()) { - try { - LOG.debug("Previous connection was never fully established. Sleeping for second to avoid busy loop."); - Thread.sleep(1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - lastConnectSucceeded.set(false); - try { - startLocalBridge(); - remoteBridgeStarted.set(true); - startedLatch.countDown(); - LOG.info("Outbound transport to " + remoteBrokerName + " resumed"); - } catch (Throwable e) { - LOG.error("Caught exception from local start in resume transport", e); - serviceLocalException(e); - } - } - } }); localBroker.start(); @@ -260,7 +210,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br asyncTaskRunner.execute(new Runnable() { public void run() { final String originalName = Thread.currentThread().getName(); - Thread.currentThread().setName("StartRemotelBridge: localBroker=" + localBroker); + Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker); try { startRemoteBridge(); } catch (Exception e) { @@ -782,14 +732,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br serviceLocalBrokerInfo(command); } else if (command.isShutdownInfo()) { LOG.info(configuration.getBrokerName() + " Shutting down"); - // Don't shut down the whole connector if the remote side - // was interrupted. - // the local transport is just shutting down temporarily - // until the remote side - // is restored. - if (!remoteInterupted.get()) { - stop(); - } + stop(); } else if (command.getClass() == ConnectionError.class) { ConnectionError ce = (ConnectionError) command; serviceLocalException(ce.getException()); diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java index 40173f4f74..c803a50149 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java @@ -90,9 +90,11 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco return; } // Should we try to connect to that URI? - if( bridges.containsKey(uri) ) { - LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: "+uri ); - return; + synchronized (bridges) { + if( bridges.containsKey(uri) ) { + LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: "+uri ); + return; + } } if ( localURI.equals(uri) || (connectionFilter != null && !connectionFilter.connectTo(uri))) { LOG.debug("not connecting loopback: " + uri); @@ -132,7 +134,9 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco NetworkBridge bridge = createBridge(localTransport, remoteTransport, event); try { bridge.start(); - bridges.put(uri, bridge); + synchronized (bridges) { + bridges.put(uri, bridge); + } } catch (Exception e) { ServiceSupport.dispose(localTransport); ServiceSupport.dispose(remoteTransport); @@ -158,12 +162,13 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco return; } - NetworkBridge bridge = bridges.remove(uri); - if (bridge == null) { - return; + NetworkBridge bridge; + synchronized (bridges) { + bridge = bridges.remove(uri); + } + if (bridge != null) { + ServiceSupport.dispose(bridge); } - - ServiceSupport.dispose(bridge); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java index 2a9cd5adbc..ad884ad5b8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java @@ -58,6 +58,10 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent { super(service); } + @Override + public String toString() { + return "[" + serviceName + ", failed:" + failed + ", connectionFailures:" + connectFailures + "]"; + } } public void setDiscoveryListener(DiscoveryListener listener) { @@ -118,7 +122,7 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent { event.connectFailures++; if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) { - LOG.warn("Reconnect attempts exceeded "+maxReconnectAttempts+" tries. Reconnecting has been disabled."); + LOG.warn("Reconnect attempts exceeded "+maxReconnectAttempts+" tries. Reconnecting has been disabled for: " + event); return; } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index dceba66706..14bf77e40c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -67,6 +67,7 @@ public class FailoverTransport implements CompositeTransport { private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class); private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10; + private static final int INFINITE = -1; private TransportListener transportListener; private boolean disposed; private boolean connected; @@ -89,11 +90,11 @@ public class FailoverTransport implements CompositeTransport { private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY; private long maxReconnectDelay = 1000 * 30; private double backOffMultiplier = 2d; - private long timeout = -1; + private long timeout = INFINITE; private boolean useExponentialBackOff = true; private boolean randomize = true; - private int maxReconnectAttempts; - private int startupMaxReconnectAttempts; + private int maxReconnectAttempts = INFINITE; + private int startupMaxReconnectAttempts = INFINITE; private int connectFailures; private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY; private Exception connectionFailure; @@ -107,8 +108,6 @@ public class FailoverTransport implements CompositeTransport { private int maxCacheSize = 128 * 1024; private final TransportListener disposedListener = new DefaultTransportListener() { }; - //private boolean connectionInterruptProcessingComplete; - private final TransportListener myTransportListener = createTransportListener(); private boolean updateURIsSupported = true; private boolean reconnectSupported = true; @@ -222,12 +221,12 @@ public class FailoverTransport implements CompositeTransport { boolean reconnectOk = false; synchronized (reconnectMutex) { - if (started) { - LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed to " + connectedTransportURI - + " , attempting to automatically reconnect due to: " + e); - LOG.debug("Transport failed with the following exception:", e); + if (canReconnect()) { reconnectOk = true; } + LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed, reason: " + e + + (reconnectOk ? "," : ", not") +" attempting to automatically reconnect"); + initialized = false; failedConnectTransportURI = connectedTransportURI; connectedTransportURI = null; @@ -240,11 +239,17 @@ public class FailoverTransport implements CompositeTransport { if (reconnectOk) { reconnectTask.wakeup(); + } else { + propagateFailureToExceptionListener(e); } } } } + private boolean canReconnect() { + return started && 0 != calculateReconnectAttemptLimit(); + } + public final void handleConnectionControl(ConnectionControl control) { String reconnectStr = control.getReconnectTo(); if (reconnectStr != null) { @@ -292,7 +297,9 @@ public class FailoverTransport implements CompositeTransport { public void start() throws Exception { synchronized (reconnectMutex) { - LOG.debug("Started."); + if (LOG.isDebugEnabled()) { + LOG.debug("Started " + this); + } if (started) { return; } @@ -311,7 +318,9 @@ public class FailoverTransport implements CompositeTransport { public void stop() throws Exception { Transport transportToStop = null; synchronized (reconnectMutex) { - LOG.debug("Stopped."); + if (LOG.isDebugEnabled()) { + LOG.debug("Stopped " + this); + } if (!started) { return; } @@ -825,9 +834,7 @@ public class FailoverTransport implements CompositeTransport { doRebalance = false; } - if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) { - reconnectDelay = initialReconnectDelay; - } + resetReconnectDelay(); Transport transport = null; URI uri = null; @@ -845,7 +852,9 @@ public class FailoverTransport implements CompositeTransport { // for the first time, or we were disposed for some reason. if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) { synchronized (sleepMutex) { - LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); + } try { sleepMutex.wait(reconnectDelay); } catch (InterruptedException e) { @@ -868,16 +877,18 @@ public class FailoverTransport implements CompositeTransport { } if (LOG.isDebugEnabled()) { - LOG.debug("Attempting connect to: " + uri); + LOG.debug("Attempting " + connectFailures + "th connect to: " + uri); } transport.setTransportListener(myTransportListener); transport.start(); - if (started) { + if (started && !firstConnection) { restoreTransport(transport); } - LOG.debug("Connection established"); + if (LOG.isDebugEnabled()) { + LOG.debug("Connection established"); + } reconnectDelay = initialReconnectDelay; connectedTransportURI = uri; connectedTransport.set(transport); @@ -899,7 +910,9 @@ public class FailoverTransport implements CompositeTransport { if (transportListener != null) { transportListener.transportResumed(); } else { - LOG.debug("transport resumed by transport listener not set"); + if (LOG.isDebugEnabled()) { + LOG.debug("transport resumed by transport listener not set"); + } } if (firstConnection) { @@ -934,19 +947,10 @@ public class FailoverTransport implements CompositeTransport { } } - int reconnectAttempts = 0; - if (firstConnection) { - if (this.startupMaxReconnectAttempts != 0) { - reconnectAttempts = this.startupMaxReconnectAttempts; - } - } + int reconnectLimit = calculateReconnectAttemptLimit(); - if (reconnectAttempts == 0) { - reconnectAttempts = this.maxReconnectAttempts; - } - - if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) { - LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)"); + if (reconnectLimit != INFINITE && ++connectFailures >= reconnectLimit) { + LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)"); connectionFailure = failure; // Make sure on initial startup, that the transportListener has been @@ -960,14 +964,7 @@ public class FailoverTransport implements CompositeTransport { } } - if (transportListener != null) { - if (connectionFailure instanceof IOException) { - transportListener.onException((IOException) connectionFailure); - } else { - transportListener.onException(IOExceptionSupport.create(connectionFailure)); - } - } - reconnectMutex.notifyAll(); + propagateFailureToExceptionListener(connectionFailure); return false; } } @@ -976,7 +973,9 @@ public class FailoverTransport implements CompositeTransport { if (reconnectDelay > 0) { synchronized (sleepMutex) { - LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection"); + } try { sleepMutex.wait(reconnectDelay); } catch (InterruptedException e) { @@ -997,6 +996,34 @@ public class FailoverTransport implements CompositeTransport { return !disposed; } + private void resetReconnectDelay() { + if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) { + reconnectDelay = initialReconnectDelay; + } + } + + /* + * called with reconnectMutex held + */ + private void propagateFailureToExceptionListener(Exception exception) { + if (transportListener != null) { + if (exception instanceof IOException) { + transportListener.onException((IOException)exception); + } else { + transportListener.onException(IOExceptionSupport.create(exception)); + } + } + reconnectMutex.notifyAll(); + } + + private int calculateReconnectAttemptLimit() { + int maxReconnectValue = this.maxReconnectAttempts; + if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) { + maxReconnectValue = this.startupMaxReconnectAttempts; + } + return maxReconnectValue; + } + final boolean buildBackups() { synchronized (backupMutex) { if (!disposed && backup && backups.size() < backupPoolSize) { diff --git a/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java index 064166d162..bc6819ff5e 100644 --- a/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java @@ -17,12 +17,15 @@ package org.apache.activemq.network; import java.net.URI; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.Vector; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; @@ -31,6 +34,7 @@ import static org.junit.Assert.assertTrue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.management.ObjectName; @@ -42,8 +46,10 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.SslContext; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.transport.tcp.SslBrokerServiceTest; import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.JMXSupport; import org.apache.activemq.util.Wait; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,12 +76,16 @@ public class FailoverStaticNetworkTest { protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts, HashMap networkProps) throws Exception { BrokerService broker = new BrokerService(); - //broker.setUseJmx(false); broker.getManagementContext().setCreateConnector(false); broker.setSslContext(sslContext); broker.setDeleteAllMessagesOnStartup(true); broker.setBrokerName("Broker_" + listenPort); - broker.addConnector(scheme + "://localhost:" + listenPort); + // lazy init listener on broker start + TransportConnector transportConnector = new TransportConnector(); + transportConnector.setUri(new URI(scheme + "://localhost:" + listenPort)); + List transportConnectors = new ArrayList(); + transportConnectors.add(transportConnector); + broker.setTransportConnectors(transportConnectors); if (networkToPorts != null && networkToPorts.length > 0) { StringBuilder builder = new StringBuilder("static:(failover:(" + scheme + "://localhost:"); builder.append(networkToPorts[0]); @@ -84,7 +94,7 @@ public class FailoverStaticNetworkTest { } // limit the reconnects in case of initial random connection to slave // leaving randomize on verifies that this config is picked up - builder.append(")?maxReconnectAttempts=1)"); + builder.append(")?maxReconnectAttempts=0)?useExponentialBackOff=false"); NetworkConnector nc = broker.addNetworkConnector(builder.toString()); if (networkProps != null) { IntrospectionSupport.setProperties(nc, networkProps); @@ -309,11 +319,89 @@ public class FailoverStaticNetworkTest { doTestNetworkSendReceive(); } + @Test + public void testRepeatedSendReceiveWithMasterSlaveAlternate() throws Exception { + + brokerB = createBroker("tcp", "62617", new String[]{"61610","61611"}); + brokerB.start(); + + final AtomicBoolean done = new AtomicBoolean(false); + ExecutorService executorService = Executors.newCachedThreadPool(); + executorService.execute(new Runnable() { + @Override + public void run() { + try { + while (!done.get()) { + brokerA = createBroker("tcp", "61610", null); + brokerA.setBrokerName("Pair"); + brokerA.setBrokerObjectName(new ObjectName(brokerA.getManagementContext().getJmxDomainName() + ":" + "BrokerName=" + + JMXSupport.encodeObjectNamePart("A") + "," + "Type=Broker")); + ((KahaDBPersistenceAdapter)brokerA.getPersistenceAdapter()).setDatabaseLockedWaitDelay(1000); + brokerA.start(); + brokerA.waitUntilStopped(); + + // restart after peer taken over + brokerA1.waitUntilStarted(); + } + } catch (Exception ignored) { + LOG.info("A create/start, unexpected: " + ignored, ignored); + } + } + }); + + // start with brokerA as master + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return brokerA != null && brokerA.waitUntilStarted(); + } + }); + + executorService.execute(new Runnable() { + @Override + public void run() { + try { + while (!done.get()) { + brokerA1 = createBroker("tcp", "61611", null); + brokerA1.setBrokerName("Pair"); + // so they can coexist in local jmx we set the object name b/c the brokername identifies the shared store + brokerA1.setBrokerObjectName(new ObjectName(brokerA.getManagementContext().getJmxDomainName() + ":" + "BrokerName=" + + JMXSupport.encodeObjectNamePart("A1") + "," + "Type=Broker")); + ((KahaDBPersistenceAdapter)brokerA1.getPersistenceAdapter()).setDatabaseLockedWaitDelay(1000); + brokerA1.start(); + brokerA1.waitUntilStopped(); + + // restart after peer taken over + brokerA.waitUntilStarted(); + } + } catch (Exception ignored) { + LOG.info("A1 create/start, unexpected: " + ignored, ignored); + } + } + }); + + for (int i=0; i<10; i++) { + BrokerService currentMaster = (i%2 == 0 ? brokerA : brokerA1); + LOG.info("iteration: " + i + ", using: " + currentMaster.getBrokerObjectName().getKeyProperty("BrokerName")); + currentMaster.waitUntilStarted(); + + doTestNetworkSendReceive(brokerB, currentMaster); + + LOG.info("Stopping " + currentMaster.getBrokerObjectName().getKeyProperty("BrokerName")); + currentMaster.stop(); + currentMaster.waitUntilStopped(); + } + + done.set(false); + LOG.info("all done"); + executorService.shutdownNow(); + } + private void doTestNetworkSendReceive() throws Exception, JMSException { doTestNetworkSendReceive(brokerB, brokerA); } - private void doTestNetworkSendReceive(BrokerService to, BrokerService from) throws Exception, JMSException { + private void doTestNetworkSendReceive(final BrokerService to, final BrokerService from) throws Exception, JMSException { LOG.info("Creating Consumer on the networked broker ..." + from); @@ -332,7 +420,9 @@ public class FailoverStaticNetworkTest { boolean gotMessage = Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { - return consumer.receive(1000) != null; + Message message = consumer.receive(5000); + LOG.info("from: " + from.getBrokerObjectName().getKeyProperty("BrokerName") + ", received: " + message); + return message != null; } }); try { diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryUriTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryUriTest.java index ddbf45d1a9..778d03d9f0 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryUriTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryUriTest.java @@ -54,7 +54,7 @@ public class DiscoveryUriTest extends EmbeddedBrokerTestSupport { public void testFailedConnect() throws Exception { try { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("discovery:(multicast://default?group=test1)?reconnectDelay=1000&maxReconnectAttempts=3&useExponentialBackOff=false"); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("discovery:(multicast://default?group=test1)?reconnectDelay=1000&startupMaxReconnectAttempts=3&useExponentialBackOff=false"); Connection conn = factory.createConnection(); conn.start(); } catch (Exception e) { diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java index 94857f3141..357e0fbcb7 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.transport.failover; import java.io.IOException; import java.net.*; import java.util.*; +import java.util.concurrent.CountDownLatch; import javax.jms.Connection; import javax.net.ServerSocketFactory; @@ -30,17 +31,20 @@ import org.apache.activemq.util.Wait; public class SlowConnectionTest extends TestCase { + private CountDownLatch socketReadyLatch = new CountDownLatch(1); + public void testSlowConnection() throws Exception { - int timeout = 1000; - URI tcpUri = new URI("tcp://localhost:61616?soTimeout=" + timeout + "&trace=true&connectionTimeout=" + timeout + "&wireFormat.maxInactivityDurationInitalDelay=" + timeout); - - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")"); - final Connection connection = cf.createConnection(); - MockBroker broker = new MockBroker(); broker.start(); + socketReadyLatch.await(); + int timeout = 1000; + URI tcpUri = new URI("tcp://localhost:" + broker.ss.getLocalPort() + "?soTimeout=" + timeout + "&trace=true&connectionTimeout=" + timeout + "&wireFormat.maxInactivityDurationInitalDelay=" + timeout); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")"); + final Connection connection = cf.createConnection(); + new Thread(new Runnable() { public void run() { try { connection.start(); } catch (Throwable ignored) {} @@ -62,19 +66,25 @@ public class SlowConnectionTest extends TestCase { } class MockBroker extends Thread { + ServerSocket ss = null; + public MockBroker() { + super("MockBroker"); + } public void run() { List inProgress = new ArrayList(); ServerSocketFactory factory = ServerSocketFactory.getDefault(); - ServerSocket ss = null; try { - ss = factory.createServerSocket(61616); + ss = factory.createServerSocket(0); + ss.setSoTimeout(5000); + socketReadyLatch.countDown(); while (!interrupted()) { inProgress.add(ss.accept()); // eat socket } + } catch (java.net.SocketTimeoutException expected) { } catch (Exception e) { e.printStackTrace(); } finally { diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java index 65f9bfb628..71ab687cdd 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java @@ -320,7 +320,7 @@ public class DurableSubProcessTest extends org.apache.activemq.TestSupport { "jms.alwaysSyncSend=true&jms.dispatchAsync=true&" + "jms.producerWindowSize=20971520&" + "jms.copyMessageOnSend=false&" + - "initialReconnectDelay=100&maxReconnectDelay=30000&maxReconnectAttempts=0&" + + "initialReconnectDelay=100&maxReconnectDelay=30000&" + "useExponentialBackOff=true"; final ConnectionFactory cf = new ActiveMQConnectionFactory(url); diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java index 8cdf327529..d036685b36 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java @@ -398,7 +398,7 @@ public class DurableSubProcessWithRestartTest { + "jms.alwaysSyncSend=true&jms.dispatchAsync=true&" + "jms.producerWindowSize=20971520&" + "jms.copyMessageOnSend=false&" - + "initialReconnectDelay=100&maxReconnectDelay=30000&maxReconnectAttempts=0&" + + "initialReconnectDelay=100&maxReconnectDelay=30000&" + "useExponentialBackOff=true"; final ConnectionFactory cf = new ActiveMQConnectionFactory(url);