mirror of https://github.com/apache/activemq.git
resolve some timing issues that caused intermittent failure of the test for: https://issues.apache.org/activemq/browse/AMQ-1855 - also wait a little longer for a close to complete before restarting
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@811371 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f0a6c69d70
commit
b0e3570652
|
@ -377,8 +377,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
if( !sendShutdown.await(5, TimeUnit.SECONDS) ) {
|
if( !sendShutdown.await(10, TimeUnit.SECONDS) ) {
|
||||||
LOG.debug("Network Could not shutdown in a timely manner");
|
LOG.info("Network Could not shutdown in a timely manner");
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
ServiceStopper ss = new ServiceStopper();
|
ServiceStopper ss = new ServiceStopper();
|
||||||
|
|
|
@ -16,10 +16,13 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.transport.discovery;
|
package org.apache.activemq.transport.discovery;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import javax.management.ObjectInstance;
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
@ -31,8 +34,10 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.jmock.Expectations;
|
import org.jmock.Expectations;
|
||||||
import org.jmock.Mockery;
|
import org.jmock.Mockery;
|
||||||
|
import org.jmock.api.Invocation;
|
||||||
import org.jmock.integration.junit4.JMock;
|
import org.jmock.integration.junit4.JMock;
|
||||||
import org.jmock.integration.junit4.JUnit4Mockery;
|
import org.jmock.integration.junit4.JUnit4Mockery;
|
||||||
|
import org.jmock.lib.action.CustomAction;
|
||||||
import org.jmock.lib.legacy.ClassImposteriser;
|
import org.jmock.lib.legacy.ClassImposteriser;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -51,6 +56,9 @@ public class DiscoveryNetworkReconnectTest {
|
||||||
|
|
||||||
final String groupName = "GroupID-" + "DiscoveryNetworkReconnectTest";
|
final String groupName = "GroupID-" + "DiscoveryNetworkReconnectTest";
|
||||||
final String discoveryAddress = "multicast://default?group=" + groupName + "&initialReconnectDelay=1000";
|
final String discoveryAddress = "multicast://default?group=" + groupName + "&initialReconnectDelay=1000";
|
||||||
|
final Semaphore mbeanRegistered = new Semaphore(0);
|
||||||
|
final Semaphore mbeanUnregistered = new Semaphore(0);
|
||||||
|
|
||||||
|
|
||||||
private DiscoveryAgent agent;
|
private DiscoveryAgent agent;
|
||||||
SocketProxy proxy;
|
SocketProxy proxy;
|
||||||
|
@ -68,15 +76,12 @@ public class DiscoveryNetworkReconnectTest {
|
||||||
brokerA.start();
|
brokerA.start();
|
||||||
|
|
||||||
proxy = new SocketProxy(brokerA.getTransportConnectors().get(0).getConnectUri());
|
proxy = new SocketProxy(brokerA.getTransportConnectors().get(0).getConnectUri());
|
||||||
//new SocketProxy(new URI("tcp://localhost:61617"));
|
|
||||||
|
|
||||||
managementContext = context.mock(ManagementContext.class);
|
managementContext = context.mock(ManagementContext.class);
|
||||||
|
|
||||||
context.checking(new Expectations(){{
|
context.checking(new Expectations(){{
|
||||||
allowing (managementContext).getJmxDomainName(); will (returnValue("Test"));
|
allowing (managementContext).getJmxDomainName(); will (returnValue("Test"));
|
||||||
allowing (managementContext).start();
|
allowing (managementContext).start();
|
||||||
allowing (managementContext).stop();
|
allowing (managementContext).stop();
|
||||||
allowing (managementContext).unregisterMBean(with(any(ObjectName.class)));
|
|
||||||
|
|
||||||
// expected MBeans
|
// expected MBeans
|
||||||
allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(
|
allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(
|
||||||
|
@ -86,10 +91,31 @@ public class DiscoveryNetworkReconnectTest {
|
||||||
allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(
|
allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(
|
||||||
new ObjectName("Test:BrokerName=BrokerNC,Type=Topic,Destination=ActiveMQ.Advisory.Connection"))));
|
new ObjectName("Test:BrokerName=BrokerNC,Type=Topic,Destination=ActiveMQ.Advisory.Connection"))));
|
||||||
|
|
||||||
// due to reconnect we get two registrations
|
|
||||||
atLeast(maxReconnects - 1).of (managementContext).registerMBean(with(any(Object.class)), with(equal(
|
atLeast(maxReconnects - 1).of (managementContext).registerMBean(with(any(Object.class)), with(equal(
|
||||||
new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=localhost,Name=localhost/127.0.0.1_"
|
new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=localhost,Name=localhost/127.0.0.1_"
|
||||||
+ proxy.getUrl().getPort()))));
|
+ proxy.getUrl().getPort())))); will(new CustomAction("signal register network mbean") {
|
||||||
|
public Object invoke(Invocation invocation) throws Throwable {
|
||||||
|
LOG.info("Mbean Registered: " + invocation.getParameter(0));
|
||||||
|
mbeanRegistered.release();
|
||||||
|
return new ObjectInstance((ObjectName)invocation.getParameter(0), "dscription");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
atLeast(maxReconnects - 1).of (managementContext).unregisterMBean(with(equal(
|
||||||
|
new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=localhost,Name=localhost/127.0.0.1_"
|
||||||
|
+ proxy.getUrl().getPort())))); will(new CustomAction("signal unregister network mbean") {
|
||||||
|
public Object invoke(Invocation invocation) throws Throwable {
|
||||||
|
LOG.info("Mbean Unregistered: " + invocation.getParameter(0));
|
||||||
|
mbeanUnregistered.release();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
allowing (managementContext).unregisterMBean(with(equal(
|
||||||
|
new ObjectName("Test:BrokerName=BrokerNC,Type=Broker"))));
|
||||||
|
allowing (managementContext).unregisterMBean(with(equal(
|
||||||
|
new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkConnector,NetworkConnectorName=localhost"))));
|
||||||
|
allowing (managementContext).unregisterMBean(with(equal(
|
||||||
|
new ObjectName("Test:BrokerName=BrokerNC,Type=Topic,Destination=ActiveMQ.Advisory.Connection"))));
|
||||||
}});
|
}});
|
||||||
|
|
||||||
brokerB = new BrokerService();
|
brokerB = new BrokerService();
|
||||||
|
@ -102,6 +128,7 @@ public class DiscoveryNetworkReconnectTest {
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
brokerA.stop();
|
brokerA.stop();
|
||||||
brokerB.stop();
|
brokerB.stop();
|
||||||
|
proxy.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void configure(BrokerService broker) {
|
private void configure(BrokerService broker) {
|
||||||
|
@ -138,16 +165,18 @@ public class DiscoveryNetworkReconnectTest {
|
||||||
// Wait for connection
|
// Wait for connection
|
||||||
assertTrue("we got a network connection in a timely manner", Wait.waitFor(new Wait.Condition() {
|
assertTrue("we got a network connection in a timely manner", Wait.waitFor(new Wait.Condition() {
|
||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
return proxy.connections.size() == 1;
|
return proxy.connections.size() >= 1;
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
Thread.sleep(3000);
|
|
||||||
|
|
||||||
// force an inactivity timeout timeout
|
// wait for network connector
|
||||||
|
assertTrue("network connector mbean registered within 1 minute", mbeanRegistered.tryAcquire(60, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// force an inactivity timeout via the proxy
|
||||||
proxy.pause();
|
proxy.pause();
|
||||||
|
|
||||||
// wait for the inactivity timeout
|
// wait for the inactivity timeout and network shutdown
|
||||||
Thread.sleep(6000);
|
assertTrue("network connector mbean unregistered within 1 minute", mbeanUnregistered.tryAcquire(60, TimeUnit.SECONDS));
|
||||||
|
|
||||||
// let a reconnect succeed
|
// let a reconnect succeed
|
||||||
proxy.goOn();
|
proxy.goOn();
|
||||||
|
|
Loading…
Reference in New Issue