resolve https://issues.apache.org/activemq/browse/AMQ-1855 - effects multicast discovery also; disposed is now atomic and the bridge.stop() waits for shutdown info to be sent before closing the transport, this will alleviate the InvalidClientIDException. bridges are now only remembered after they start which alleviates the retry problem with a failed bridge remaining, added test that exercises the code but which does not demonstrate the behavour on a dual core

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@808890 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-08-28 14:16:29 +00:00
parent 868965997c
commit 5d42b72218
6 changed files with 94 additions and 68 deletions

View File

@ -70,7 +70,7 @@ public class CompositeDemandForwardingBridge extends DemandForwardingBridgeSuppo
ServiceSupport.dispose(this);
}
}
if (!disposed) {
if (!disposed.get()) {
triggerLocalStartBridge();
}
}

View File

@ -103,7 +103,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected int demandConsumerDispatched;
protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
protected boolean disposed;
protected AtomicBoolean disposed = new AtomicBoolean();
protected BrokerId localBrokerId;
protected ActiveMQDestination[] excludedDestinations;
protected ActiveMQDestination[] dynamicallyIncludedDestinations;
@ -281,7 +281,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
localConnectionInfo = new ConnectionInfo();
localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
localClientId = "NC_" + remoteBrokerName + "_inbound" + configuration.getBrokerName();
localClientId = "NC_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
localConnectionInfo.setClientId(localClientId);
localConnectionInfo.setUserName(configuration.getUserName());
localConnectionInfo.setPassword(configuration.getPassword());
@ -345,7 +345,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
remoteBroker.oneway(demandConsumerInfo);
startedLatch.countDown();
if (!disposed) {
if (!disposed.get()) {
triggerLocalStartBridge();
}
}
@ -354,37 +354,36 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
public void stop() throws Exception {
if (started.compareAndSet(true, false)) {
LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName + " is disposed already ? " + disposed);
boolean wasDisposedAlready = disposed;
if (!disposed) {
if (disposed.compareAndSet(false, true)) {
LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null) {
l.onStop(this);
}
try {
disposed = true;
remoteBridgeStarted.set(false);
final CountDownLatch sendShutdown = new CountDownLatch(1);
ASYNC_TASKS.execute(new Runnable() {
public void run() {
try {
localBroker.oneway(new ShutdownInfo());
sendShutdown.countDown();
remoteBroker.oneway(new ShutdownInfo());
} catch (Throwable e) {
LOG.debug("Caught exception sending shutdown", e);
}finally {
} finally {
sendShutdown.countDown();
}
}
});
if( !sendShutdown.await(100, TimeUnit.MILLISECONDS) ) {
if( !sendShutdown.await(5, TimeUnit.SECONDS) ) {
LOG.debug("Network Could not shutdown in a timely manner");
}
} finally {
ServiceStopper ss = new ServiceStopper();
ss.stop(localBroker);
ss.stop(remoteBroker);
ss.stop(localBroker);
// Release the started Latch since another thread could be
// stuck waiting for it to start up.
startedLatch.countDown();
@ -393,16 +392,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
ss.throwFirstException();
}
}
if (wasDisposedAlready) {
LOG.debug(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
} else {
LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
}
LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
}
}
public void serviceRemoteException(Throwable error) {
if (!disposed) {
if (!disposed.get()) {
if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
} else {
@ -419,7 +414,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
protected void serviceRemoteCommand(Command command) {
if (!disposed) {
if (!disposed.get()) {
try {
if (command.isMessageDispatch()) {
waitStarted();
@ -606,7 +601,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
public void serviceLocalException(Throwable error) {
if (!disposed) {
if (!disposed.get()) {
LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
LOG.debug("The local Exception was:" + error, error);
ASYNC_TASKS.execute(new Runnable() {
@ -652,7 +647,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
protected void serviceLocalCommand(Command command) {
if (!disposed) {
if (!disposed.get()) {
try {
if (command.isMessageDispatch()) {
enqueueCounter.incrementAndGet();

View File

@ -126,9 +126,9 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
SslContext.setCurrentSslContext(null);
}
NetworkBridge bridge = createBridge(localTransport, remoteTransport, event);
bridges.put(uri, bridge);
try {
bridge.start();
bridges.put(uri, bridge);
} catch (Exception e) {
ServiceSupport.dispose(localTransport);
ServiceSupport.dispose(remoteTransport);

View File

@ -161,7 +161,6 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent {
event.connectTime = System.currentTimeMillis();
event.failed.set(false);
listener.onServiceAdd(event);
}
});

View File

@ -34,6 +34,7 @@ import org.jmock.Mockery;
import org.jmock.integration.junit4.JMock;
import org.jmock.integration.junit4.JUnit4Mockery;
import org.jmock.lib.legacy.ClassImposteriser;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -43,16 +44,17 @@ import org.junit.runner.RunWith;
public class DiscoveryNetworkReconnectTest {
private static final Log LOG = LogFactory.getLog(DiscoveryNetworkReconnectTest.class);
final int maxReconnects = 5;
BrokerService brokerA, brokerB;
Mockery context;
ManagementContext managementContext;
final String groupName = "GroupID-" + "DiscoveryNetworkReconnectTest";
final String discoveryAddress = "multicast://default?group=" + groupName + "&initialReconnectDelay=600";
final String discoveryAddress = "multicast://default?group=" + groupName + "&initialReconnectDelay=1000";
private DiscoveryAgent agent;
SocketProxy proxy;
@Before
public void setUp() throws Exception {
context = new JUnit4Mockery() {{
@ -64,22 +66,10 @@ public class DiscoveryNetworkReconnectTest {
configure(brokerA);
brokerA.addConnector("tcp://localhost:0");
brokerA.start();
}
private void configure(BrokerService broker) {
broker.setPersistent(false);
broker.setUseJmx(true);
}
proxy = new SocketProxy(brokerA.getTransportConnectors().get(0).getConnectUri());
//new SocketProxy(new URI("tcp://localhost:61617"));
@Test
public void testReconnect() throws Exception {
final SocketProxy proxy = new SocketProxy(brokerA.getTransportConnectors().get(0).getConnectUri());
// control multicast publish advertise agent to inject proxy
agent = MulticastDiscoveryAgentFactory.createDiscoveryAgent(new URI(discoveryAddress));
agent.registerService(proxy.getUrl().toString());
agent.start();
managementContext = context.mock(ManagementContext.class);
context.checking(new Expectations(){{
@ -97,40 +87,70 @@ public class DiscoveryNetworkReconnectTest {
new ObjectName("Test:BrokerName=BrokerNC,Type=Topic,Destination=ActiveMQ.Advisory.Connection"))));
// due to reconnect we get two registrations
atLeast(2).of (managementContext).registerMBean(with(any(Object.class)), with(equal(
atLeast(maxReconnects - 1).of (managementContext).registerMBean(with(any(Object.class)), with(equal(
new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=localhost,Name=localhost/127.0.0.1_"
+ proxy.getUrl().getPort()))));
}});
brokerB = new BrokerService();
brokerB.setManagementContext(managementContext);
brokerB.setBrokerName("BrokerNC");
configure(brokerB);
brokerB.addNetworkConnector(discoveryAddress + "&wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000&trace=true");
brokerB.start();
configure(brokerB);
}
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return proxy.connections.size() == 1;
}
});
// force an inactivity timeout timeout
proxy.pause();
// wait for the inactivity timeout
Thread.sleep(2000);
// let a reconnect succeed
proxy.goOn();
assertTrue("got a reconnect", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return proxy.connections.size() == 1;
}
}));
@After
public void tearDown() throws Exception {
brokerA.stop();
brokerB.stop();
// let mockery validate minimal duplicate mbean registrations
}
private void configure(BrokerService broker) {
broker.setPersistent(false);
broker.setUseJmx(true);
}
@Test
public void testMulicastReconnect() throws Exception {
// control multicast advertise agent to inject proxy
agent = MulticastDiscoveryAgentFactory.createDiscoveryAgent(new URI(discoveryAddress));
agent.registerService(proxy.getUrl().toString());
agent.start();
brokerB.addNetworkConnector(discoveryAddress + "&wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
brokerB.start();
doReconnect();
}
@Test
public void testSimpleReconnect() throws Exception {
brokerB.addNetworkConnector("simple://(" + proxy.getUrl()
+ ")?useExponentialBackOff=false&initialReconnectDelay=500&wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
brokerB.start();
doReconnect();
}
private void doReconnect() throws Exception {
for (int i=0; i<maxReconnects; i++) {
// Wait for connection
assertTrue("we got a network connection in a timely manner", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return proxy.connections.size() == 1;
}
}));
Thread.sleep(1000);
// force an inactivity timeout timeout
proxy.pause();
// wait for the inactivity timeout
Thread.sleep(3000);
// let a reconnect succeed
proxy.goOn();
}
}
}

View File

@ -51,6 +51,16 @@ public class URISupportTest extends TestCase {
assertEquals(2, data.getComponents().length);
}
public void testCompositeWithComponentParam() throws Exception {
CompositeData data = URISupport.parseComposite(new URI("test:(part1://host?part1=true)?outside=true"));
assertEquals(1, data.getComponents().length);
assertEquals(1, data.getParameters().size());
Map part1Params = URISupport.parseParamters(data.getComponents()[0]);
assertEquals(1, part1Params.size());
assertTrue(part1Params.containsKey("part1"));
}
public void testParsingURI() throws Exception {
URI source = new URI("tcp://localhost:61626/foo/bar?cheese=Edam&x=123");
@ -70,7 +80,9 @@ public class URISupportTest extends TestCase {
}
public void testParsingCompositeURI() throws URISyntaxException {
URISupport.parseComposite(new URI("broker://(tcp://localhost:61616)?name=foo"));
CompositeData data = URISupport.parseComposite(new URI("broker://(tcp://localhost:61616)?name=foo"));
assertEquals("one component", 1, data.getComponents().length);
assertEquals("Size: " + data.getParameters(), 1, data.getParameters().size());
}
public void testCheckParenthesis() throws Exception {