diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java b/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java index 256c9d7b93..7bd19c41b5 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java @@ -89,7 +89,7 @@ public abstract class AbstractInactivityMonitor extends TransportFilter { @Override public void run() { onException(new InactivityIOException( - "Channel was inactive for too (>" + (connectAttemptTimeout) + ") long: " + next.getRemoteAddress())); + "Channel was inactive (no connection attempt made) for too (>" + (connectAttemptTimeout) + ") long: " + next.getRemoteAddress())); } }); } catch (RejectedExecutionException ex) { diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-client/src/main/java/org/apache/activemq/transport/InactivityMonitor.java index 288ec61eb6..d86699b3b6 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/InactivityMonitor.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/InactivityMonitor.java @@ -46,7 +46,9 @@ public class InactivityMonitor extends AbstractInactivityMonitor { @Override public void start() throws Exception { - startConnectCheckTask(); + if (!isMonitorStarted()) { + startConnectCheckTask(); + } super.start(); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java index 3afa7d13f8..14b2e0fc6a 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.network; +import java.util.concurrent.TimeUnit; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -25,8 +27,11 @@ import javax.jms.TemporaryQueue; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.util.Wait; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DuplexNetworkTest extends SimpleNetworkTest { + private static final Logger LOG = LoggerFactory.getLogger(DuplexNetworkTest.class); @Override protected String getLocalBrokerURI() { @@ -37,7 +42,7 @@ public class DuplexNetworkTest extends SimpleNetworkTest { protected BrokerService createRemoteBroker() throws Exception { BrokerService broker = new BrokerService(); broker.setBrokerName("remoteBroker"); - broker.addConnector("tcp://localhost:61617"); + broker.addConnector("tcp://localhost:61617?transport.connectAttemptTimeout=2000"); return broker; } @@ -57,4 +62,28 @@ public class DuplexNetworkTest extends SimpleNetworkTest { } })); } + + @Test + public void testStaysUp() throws Exception { + int bridgeIdentity = getBridgeId(); + LOG.info("Bridges: " + bridgeIdentity); + TimeUnit.SECONDS.sleep(5); + assertEquals("Same bridges", bridgeIdentity, getBridgeId()); + } + + private int getBridgeId() { + int id = 0; + while (id == 0) { + try { + id = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next().hashCode(); + } catch (Throwable tryAgainInABit) { + try { + TimeUnit.MILLISECONDS.sleep(500); + } catch (InterruptedException ignored) { + } + } + } + return id; + } + }