https://issues.apache.org/jira/browse/AMQ-3176 - Potential deadlock in duplex network connector recreation, resulting in dangling connections. https://issues.apache.org/jira/browse/AMQ-3129 - Can only have one duplex networkConnection per transportConnection. Rework of https://issues.apache.org/jira/browse/AMQ-2774. NetworkConnector.name attribute (default NC) is used to differenciate duplex network connectors so it needs to be unique to allow multiple to be configured. Removing existing matching network connectors on a reconnect is now thread safe

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1069339 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-02-10 11:02:48 +00:00
parent 3d82d74862
commit 27d0ff4011
12 changed files with 211 additions and 94 deletions

View File

@ -150,7 +150,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
private final TaskRunnerFactory taskRunnerFactory; private final TaskRunnerFactory taskRunnerFactory;
private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
private BrokerId duplexRemoteBrokerId; private String duplexNetworkConnectorId;
/** /**
* @param connector * @param connector
@ -946,7 +946,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
serviceLock.writeLock().unlock(); serviceLock.writeLock().unlock();
} }
} }
}); }, "StopAsync:" + transport.getRemoteAddress());
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t); LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t);
stopped.countDown(); stopped.countDown();
@ -1179,25 +1179,32 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
// so this TransportConnection is the rear end of a network bridge // so this TransportConnection is the rear end of a network bridge
// We have been requested to create a two way pipe ... // We have been requested to create a two way pipe ...
try { try {
// We first look if existing network connection already exists for the same broker Id and network connector name
// It's possible in case of brief network fault to have this transport connector side of the connection always active
// and the duplex network connector side wanting to open a new one
// In this case, the old connection must be broken
BrokerId remoteBrokerId = info.getBrokerId();
setDuplexRemoteBrokerId(remoteBrokerId);
CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
TransportConnection c = iter.next();
if ((c != this) && (remoteBrokerId.equals(c.getDuplexRemoteBrokerId()))) {
LOG.warn("An existing duplex active connection already exists for this broker (" + remoteBrokerId + "). Stopping it.");
c.stop();
}
}
Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties()); Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
Map<String, String> props = createMap(properties); Map<String, String> props = createMap(properties);
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
IntrospectionSupport.setProperties(config, props, ""); IntrospectionSupport.setProperties(config, props, "");
config.setBrokerName(broker.getBrokerName()); config.setBrokerName(broker.getBrokerName());
// check for existing duplex connection hanging about
// We first look if existing network connection already exists for the same broker Id and network connector name
// It's possible in case of brief network fault to have this transport connector side of the connection always active
// and the duplex network connector side wanting to open a new one
// In this case, the old connection must be broken
String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
synchronized (connections) {
for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
TransportConnection c = iter.next();
if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
LOG.warn("Stopping an existing active duplex connection [" + c + "] for network connector (" + duplexNetworkConnectorId + ").");
c.stopAsync();
// better to wait for a bit rather than get connection id already in use and failure to start new bridge
c.getStopped().await(1, TimeUnit.SECONDS);
}
}
setDuplexNetworkConnectorId(duplexNetworkConnectorId);
}
URI uri = broker.getVmConnectorURI(); URI uri = broker.getVmConnectorURI();
HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri)); HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
map.put("network", "true"); map.put("network", "true");
@ -1217,13 +1224,14 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
info.setDuplexConnection(false); info.setDuplexConnection(false);
duplexBridge.setCreatedByDuplex(true); duplexBridge.setCreatedByDuplex(true);
duplexBridge.duplexStart(this, brokerInfo, info); duplexBridge.duplexStart(this, brokerInfo, info);
LOG.info("Created Duplex Bridge back to " + info.getBrokerName()); LOG.info("Started responder end of duplex bridge " + duplexNetworkConnectorId);
return null; return null;
} catch (TransportDisposedIOException e) { } catch (TransportDisposedIOException e) {
LOG.warn("Duplex Bridge back to " + info.getBrokerName() + " was correctly stopped before it was correctly started."); LOG.warn("Duplex bridge " + duplexNetworkConnectorId + " was stopped before it was correctly started.");
return null; return null;
} catch (Exception e) { } catch (Exception e) {
LOG.error("Creating duplex network bridge", e); LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId , e);
return null;
} }
} }
// We only expect to get one broker info command per connection // We only expect to get one broker info command per connection
@ -1415,11 +1423,15 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return connectionStateRegister.lookupConnectionState(connectionId); return connectionStateRegister.lookupConnectionState(connectionId);
} }
protected synchronized void setDuplexRemoteBrokerId(BrokerId remoteBrokerId) { protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) {
this.duplexRemoteBrokerId = remoteBrokerId; this.duplexNetworkConnectorId = duplexNetworkConnectorId;
} }
protected synchronized BrokerId getDuplexRemoteBrokerId() { protected synchronized String getDuplexNetworkConnectorId() {
return this.duplexRemoteBrokerId; return this.duplexNetworkConnectorId;
}
protected CountDownLatch getStopped() {
return stopped;
} }
} }

View File

@ -219,8 +219,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
remoteBridgeStarted.set(true); remoteBridgeStarted.set(true);
startedLatch.countDown(); startedLatch.countDown();
LOG.info("Outbound transport to " + remoteBrokerName + " resumed"); LOG.info("Outbound transport to " + remoteBrokerName + " resumed");
} catch (Exception e) { } catch (Throwable e) {
LOG.error("Caught exception from local start in resume transport", e); LOG.error("Caught exception from local start in resume transport", e);
serviceLocalException(e);
} }
} }
} }
@ -248,7 +249,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker); Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
try { try {
startLocalBridge(); startLocalBridge();
} catch (Exception e) { } catch (Throwable e) {
serviceLocalException(e); serviceLocalException(e);
} finally { } finally {
Thread.currentThread().setName(originalName); Thread.currentThread().setName(originalName);
@ -273,7 +274,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}); });
} }
protected void startLocalBridge() throws Exception { protected void startLocalBridge() throws Throwable {
if (localBridgeStarted.compareAndSet(false, true)) { if (localBridgeStarted.compareAndSet(false, true)) {
synchronized (this) { synchronized (this) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -284,7 +285,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (!disposed.get()) { if (!disposed.get()) {
localConnectionInfo = new ConnectionInfo(); localConnectionInfo = new ConnectionInfo();
localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
localClientId = "NC_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName(); localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
localConnectionInfo.setClientId(localClientId); localConnectionInfo.setClientId(localClientId);
localConnectionInfo.setUserName(configuration.getUserName()); localConnectionInfo.setUserName(configuration.getUserName());
localConnectionInfo.setPassword(configuration.getPassword()); localConnectionInfo.setPassword(configuration.getPassword());
@ -296,10 +297,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates(); X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
localConnectionInfo.setTransportContext(peerCerts); localConnectionInfo.setTransportContext(peerCerts);
} }
localBroker.oneway(localConnectionInfo); // sync requests that may fail
Object resp = localBroker.request(localConnectionInfo);
if (resp instanceof ExceptionResponse) {
throw ((ExceptionResponse)resp).getException();
}
localSessionInfo = new SessionInfo(localConnectionInfo, 1); localSessionInfo = new SessionInfo(localConnectionInfo, 1);
localBroker.oneway(localSessionInfo); localBroker.oneway(localSessionInfo);
brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex); brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex);
NetworkBridgeListener l = this.networkBridgeListener; NetworkBridgeListener l = this.networkBridgeListener;
if (l != null) { if (l != null) {
@ -346,7 +351,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
remoteConnectionInfo = new ConnectionInfo(); remoteConnectionInfo = new ConnectionInfo();
remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
remoteConnectionInfo.setClientId("NC_" + configuration.getBrokerName() + "_outbound"); remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
remoteConnectionInfo.setUserName(configuration.getUserName()); remoteConnectionInfo.setUserName(configuration.getUserName());
remoteConnectionInfo.setPassword(configuration.getPassword()); remoteConnectionInfo.setPassword(configuration.getPassword());
remoteBroker.oneway(remoteConnectionInfo); remoteBroker.oneway(remoteConnectionInfo);
@ -857,7 +862,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
/** /**
* @return Returns the staticallyIncludedDestinations. * @return Returns the staticallyIncludedDestinations.
*/ */
public ActiveMQDestination[] getStaticallyIncludedDestinations() { public ActiveMQDestination[] getStaticallyIncludedestinations() {
return staticallyIncludedDestinations; return staticallyIncludedDestinations;
} }

View File

@ -232,15 +232,6 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
return configureBridge(result); return configureBridge(result);
} }
public String getName() {
String name = super.getName();
if (name == null) {
name = discoveryAgent.toString();
super.setName(name);
}
return name;
}
@Override @Override
public String toString() { public String toString() {
return "DiscoveryNetworkConnector:" + getName() + ":" + getBrokerService(); return "DiscoveryNetworkConnector:" + getName() + ":" + getBrokerService();

View File

@ -269,19 +269,8 @@ public class LdapNetworkConnector
context.close(); context.close();
} }
/** public String toString() {
* returns the name of the connector return this.getClass().getName() + getName() + "[" + ldapURI.toString() + "]";
*
* @return connector name
*/
public String getName() {
String name = super.getName();
if (name == null) {
name = this.getClass().getName() + " [" + ldapURI.toString() + "]";
super.setName(name);
}
return name;
} }
/** /**

View File

@ -141,13 +141,9 @@ public class MulticastNetworkConnector extends NetworkConnector {
} }
} }
public String getName() { @Override
String name = super.getName(); public String toString() {
if(name == null) { return getClass().getName() + ":" + getName() + "[" + remoteTransport.toString() + "]";
name = remoteTransport.toString();
super.setName(name);
}
return name;
} }
protected DemandForwardingBridgeSupport createBridge(Transport local, Transport remote) { protected DemandForwardingBridgeSupport createBridge(Transport local, Transport remote) {

View File

@ -39,7 +39,7 @@ public class NetworkBridgeConfiguration {
private String userName; private String userName;
private String password; private String password;
private String destinationFilter = ">"; private String destinationFilter = ">";
private String name = null; private String name = "NC";
private List<ActiveMQDestination> excludedDestinations; private List<ActiveMQDestination> excludedDestinations;
private List<ActiveMQDestination> dynamicallyIncludedDestinations; private List<ActiveMQDestination> dynamicallyIncludedDestinations;
@ -223,9 +223,6 @@ public class NetworkBridgeConfiguration {
* @return the name * @return the name
*/ */
public String getName() { public String getName() {
if(this.name == null) {
this.name = "localhost";
}
return this.name; return this.name;
} }

View File

@ -210,11 +210,11 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
if (localURI == null) { if (localURI == null) {
throw new IllegalStateException("You must configure the 'localURI' property"); throw new IllegalStateException("You must configure the 'localURI' property");
} }
LOG.info("Network Connector " + getName() + " Started"); LOG.info("Network Connector " + this + " Started");
} }
protected void handleStop(ServiceStopper stopper) throws Exception { protected void handleStop(ServiceStopper stopper) throws Exception {
LOG.info("Network Connector " + getName() + " Stopped"); LOG.info("Network Connector " + this + " Stopped");
} }
public ObjectName getObjectName() { public ObjectName getObjectName() {

View File

@ -243,7 +243,7 @@ public class InactivityMonitor extends TransportFilter {
try { try {
if( failed.get() ) { if( failed.get() ) {
throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()); throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress());
} }
if (o.getClass() == WireFormatInfo.class) { if (o.getClass() == WireFormatInfo.class) {
synchronized (this) { synchronized (this) {

View File

@ -134,4 +134,8 @@ public class VMTransportServer implements TransportServer {
public InetSocketAddress getSocketAddress() { public InetSocketAddress getSocketAddress() {
return null; return null;
} }
public int getConnectionCount() {
return connectionCount.intValue();
}
} }

View File

@ -107,7 +107,7 @@ public class DiscoveryNetworkReconnectTest {
allowing (managementContext).registerMBean(with(any(Object.class)), with(equal( allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(
new ObjectName("Test:BrokerName=BrokerNC,Type=Broker")))); new ObjectName("Test:BrokerName=BrokerNC,Type=Broker"))));
allowing (managementContext).registerMBean(with(any(Object.class)), with(equal( allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(
new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkConnector,NetworkConnectorName=localhost")))); new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkConnector,NetworkConnectorName=NC"))));
allowing (managementContext).registerMBean(with(any(Object.class)), with(equal( allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(
new ObjectName("Test:BrokerName=BrokerNC,Type=Topic,Destination=ActiveMQ.Advisory.Connection")))); new ObjectName("Test:BrokerName=BrokerNC,Type=Topic,Destination=ActiveMQ.Advisory.Connection"))));
allowing (managementContext).registerMBean(with(any(Object.class)), with(equal( allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(
@ -116,7 +116,7 @@ public class DiscoveryNetworkReconnectTest {
new ObjectName("Test:BrokerName=BrokerNC,Type=jobScheduler,jobSchedulerName=JMS")))); new ObjectName("Test:BrokerName=BrokerNC,Type=jobScheduler,jobSchedulerName=JMS"))));
atLeast(maxReconnects - 1).of (managementContext).registerMBean(with(any(Object.class)), with(new NetworkBridgeObjectNameMatcher<ObjectName>( atLeast(maxReconnects - 1).of (managementContext).registerMBean(with(any(Object.class)), with(new NetworkBridgeObjectNameMatcher<ObjectName>(
new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=localhost,Name=localhost/127.0.0.1_" new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=NC,Name=localhost/127.0.0.1_"
+ proxy.getUrl().getPort())))); will(new CustomAction("signal register network mbean") { + proxy.getUrl().getPort())))); will(new CustomAction("signal register network mbean") {
public Object invoke(Invocation invocation) throws Throwable { public Object invoke(Invocation invocation) throws Throwable {
LOG.info("Mbean Registered: " + invocation.getParameter(0)); LOG.info("Mbean Registered: " + invocation.getParameter(0));
@ -125,7 +125,7 @@ public class DiscoveryNetworkReconnectTest {
} }
}); });
atLeast(maxReconnects - 1).of (managementContext).unregisterMBean(with(new NetworkBridgeObjectNameMatcher<ObjectName>( atLeast(maxReconnects - 1).of (managementContext).unregisterMBean(with(new NetworkBridgeObjectNameMatcher<ObjectName>(
new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=localhost,Name=localhost/127.0.0.1_" new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=NC,Name=localhost/127.0.0.1_"
+ proxy.getUrl().getPort())))); will(new CustomAction("signal unregister network mbean") { + proxy.getUrl().getPort())))); will(new CustomAction("signal unregister network mbean") {
public Object invoke(Invocation invocation) throws Throwable { public Object invoke(Invocation invocation) throws Throwable {
LOG.info("Mbean Unregistered: " + invocation.getParameter(0)); LOG.info("Mbean Unregistered: " + invocation.getParameter(0));
@ -137,7 +137,7 @@ public class DiscoveryNetworkReconnectTest {
allowing (managementContext).unregisterMBean(with(equal( allowing (managementContext).unregisterMBean(with(equal(
new ObjectName("Test:BrokerName=BrokerNC,Type=Broker")))); new ObjectName("Test:BrokerName=BrokerNC,Type=Broker"))));
allowing (managementContext).unregisterMBean(with(equal( allowing (managementContext).unregisterMBean(with(equal(
new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkConnector,NetworkConnectorName=localhost")))); new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkConnector,NetworkConnectorName=NC"))));
allowing (managementContext).unregisterMBean(with(equal( allowing (managementContext).unregisterMBean(with(equal(
new ObjectName("Test:BrokerName=BrokerNC,Type=Topic,Destination=ActiveMQ.Advisory.Connection")))); new ObjectName("Test:BrokerName=BrokerNC,Type=Topic,Destination=ActiveMQ.Advisory.Connection"))));
allowing (managementContext).unregisterMBean(with(equal( allowing (managementContext).unregisterMBean(with(equal(

View File

@ -18,7 +18,7 @@ package org.apache.activemq.usecases;
import java.net.URI; import java.net.URI;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.TextMessage; import javax.jms.TextMessage;
@ -26,23 +26,34 @@ import javax.jms.TextMessage;
import junit.framework.Test; import junit.framework.Test;
import org.apache.activemq.JmsMultipleBrokersTestSupport; import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.network.DiscoveryNetworkConnector; import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.util.MessageIdList; import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.SocketProxy; import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTestSupport { public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTestSupport {
private static final Log LOG = LogFactory.getLog(BrokerQueueNetworkWithDisconnectTest.class);
private static final int NETWORK_DOWN_TIME = 5000; private static final int NETWORK_DOWN_TIME = 5000;
protected static final int MESSAGE_COUNT = 200; protected static final int MESSAGE_COUNT = 200;
private static final String HUB = "HubBroker"; private static final String HUB = "HubBroker";
private static final String SPOKE = "SpokeBroker"; private static final String SPOKE = "SpokeBroker";
private SocketProxy socketProxy; private SocketProxy socketProxy;
private long networkDownTimeStart; private long networkDownTimeStart;
public boolean useDuplexNetworkBridge; public boolean useDuplexNetworkBridge = true;
public boolean sumulateStalledNetwork; public boolean sumulateStalledNetwork;
private long inactiveDuration = 1000;
private boolean useSocketProxy = true;
public void initCombosForTestSendOnAReceiveOnBWithTransportDisconnect() { public void initCombosForTestSendOnAReceiveOnBWithTransportDisconnect() {
@ -75,6 +86,80 @@ public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTest
MESSAGE_COUNT <= msgs.getMessageCount()); MESSAGE_COUNT <= msgs.getMessageCount());
} }
public void testNoStuckConnectionsWithTransportDisconnect() throws Exception {
inactiveDuration=60000l;
useDuplexNetworkBridge = true;
bridgeBrokers(SPOKE, HUB);
final BrokerItem hub = brokers.get(HUB);
hub.broker.setPlugins(new BrokerPlugin[]{
new BrokerPluginSupport() {
int sleepCount = 2;
@Override
public void removeConnection(ConnectionContext context,
ConnectionInfo info, Throwable error)
throws Exception {
try {
while(--sleepCount >= 0) {
LOG.info("sleeping for a bit in close impl to simulate load where reconnect fails due to a pending close");
TimeUnit.SECONDS.sleep(2);
}
} catch (Exception ignored) {}
super.removeConnection(context, info, error);
}
}
});
startAllBrokers();
waitForBridgeFormation();
// kill the initiator side, leaving remote end intact
// simulate async network breakage
// remote side will need to spot duplicate network and stop/kill the original
for (int i=0; i< 3; i++) {
socketProxy.halfClose();
sleep(10000);
}
// wait for full reformation of bridge
// verify no extra connections
boolean allGood = Wait.waitFor(new Wait.Condition(){
public boolean isSatisified() throws Exception {
long numConnections = hub.broker.getTransportConnectors().get(0).getConnections().size();
LOG.info("Num connetions:" + numConnections);
return numConnections == 1;
}});
if (!allGood) {
dumpAllThreads("ExtraHubConnection");
}
assertTrue("should be only one transport connection for the single duplex network connector", allGood);
allGood = Wait.waitFor(new Wait.Condition(){
public boolean isSatisified() throws Exception {
long numVmConnections = VMTransportFactory.SERVERS.get(HUB).getConnectionCount();
LOG.info("Num VM connetions:" + numVmConnections);
return numVmConnections == 1;
}});
if (!allGood) {
dumpAllThreads("ExtraHubVMConnection");
}
assertTrue("should be only one vm connection for the single network duplex network connector", allGood);
}
public void testTwoDuplexNCsAreAllowed() throws Exception {
useDuplexNetworkBridge = true;
useSocketProxy = false;
NetworkConnector connector = bridgeBrokers(SPOKE, HUB);
connector.setName("FirstDuplex");
connector = bridgeBrokers(SPOKE, HUB);
connector.setName("SecondDuplex");
startAllBrokers();
waitForBridgeFormation();
BrokerItem hub = brokers.get(HUB);
assertEquals("Has two transport Connectors", 2, hub.broker.getTransportConnectors().get(0).getConnections().size());
}
@Override @Override
protected void startAllBrokers() throws Exception { protected void startAllBrokers() throws Exception {
@ -88,6 +173,8 @@ public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTest
public void setUp() throws Exception { public void setUp() throws Exception {
networkDownTimeStart = 0; networkDownTimeStart = 0;
inactiveDuration = 1000;
useSocketProxy = true;
super.setAutoFail(true); super.setAutoFail(true);
super.setUp(); super.setUp();
final String options = "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true"; final String options = "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true";
@ -95,6 +182,13 @@ public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTest
createBroker(new URI("broker:(tcp://localhost:61616)/" + SPOKE + options)); createBroker(new URI("broker:(tcp://localhost:61616)/" + SPOKE + options));
} }
public void tearDown() throws Exception {
super.tearDown();
if (socketProxy != null) {
socketProxy.close();
}
}
public static Test suite() { public static Test suite() {
return suite(BrokerQueueNetworkWithDisconnectTest.class); return suite(BrokerQueueNetworkWithDisconnectTest.class);
} }
@ -133,16 +227,18 @@ public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTest
} }
} }
@Override @Override
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit, boolean failover) throws Exception { protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit, boolean failover) throws Exception {
List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors(); List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
URI remoteURI; URI remoteURI;
if (!transportConnectors.isEmpty()) { if (!transportConnectors.isEmpty()) {
remoteURI = ((TransportConnector)transportConnectors.get(0)).getConnectUri(); remoteURI = ((TransportConnector)transportConnectors.get(0)).getConnectUri();
if (useSocketProxy) {
socketProxy = new SocketProxy(remoteURI); socketProxy = new SocketProxy(remoteURI);
DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:(" + socketProxy.getUrl() remoteURI = socketProxy.getUrl();
+ "?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000)?useExponentialBackOff=false")); }
DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:(" + remoteURI
+ "?wireFormat.maxInactivityDuration=" + inactiveDuration + "&wireFormat.maxInactivityDurationInitalDelay=" + inactiveDuration + ")?useExponentialBackOff=false"));
connector.setDynamicOnly(dynamicOnly); connector.setDynamicOnly(dynamicOnly);
connector.setNetworkTTL(networkTTL); connector.setNetworkTTL(networkTTL);
localBroker.addNetworkConnector(connector); localBroker.addNetworkConnector(connector);
@ -154,7 +250,5 @@ public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTest
} else { } else {
throw new Exception("Remote broker has no registered connectors."); throw new Exception("Remote broker has no registered connectors.");
} }
} }
} }

View File

@ -50,7 +50,7 @@ public class SocketProxy {
private CountDownLatch closed = new CountDownLatch(1); private CountDownLatch closed = new CountDownLatch(1);
public List<Connection> connections = new LinkedList<Connection>(); public List<Bridge> connections = new LinkedList<Bridge>();
private int listenPort = 0; private int listenPort = 0;
@ -102,18 +102,33 @@ public class SocketProxy {
* close all proxy connections and acceptor * close all proxy connections and acceptor
*/ */
public void close() { public void close() {
List<Connection> connections; List<Bridge> connections;
synchronized(this.connections) { synchronized(this.connections) {
connections = new ArrayList<Connection>(this.connections); connections = new ArrayList<Bridge>(this.connections);
} }
LOG.info("close, numConnectons=" + connections.size()); LOG.info("close, numConnectons=" + connections.size());
for (Connection con : connections) { for (Bridge con : connections) {
closeConnection(con); closeConnection(con);
} }
acceptor.close(); acceptor.close();
closed.countDown(); closed.countDown();
} }
/*
* close all proxy receive connections, leaving acceptor
* open
*/
public void halfClose() {
List<Bridge> connections;
synchronized(this.connections) {
connections = new ArrayList<Bridge>(this.connections);
}
LOG.info("halfClose, numConnectons=" + connections.size());
for (Bridge con : connections) {
halfCloseConnection(con);
}
}
public boolean waitUntilClosed(long timeoutSeconds) throws InterruptedException { public boolean waitUntilClosed(long timeoutSeconds) throws InterruptedException {
return closed.await(timeoutSeconds, TimeUnit.SECONDS); return closed.await(timeoutSeconds, TimeUnit.SECONDS);
} }
@ -138,7 +153,7 @@ public class SocketProxy {
synchronized(connections) { synchronized(connections) {
LOG.info("pause, numConnectons=" + connections.size()); LOG.info("pause, numConnectons=" + connections.size());
acceptor.pause(); acceptor.pause();
for (Connection con : connections) { for (Bridge con : connections) {
con.pause(); con.pause();
} }
} }
@ -150,14 +165,14 @@ public class SocketProxy {
public void goOn() { public void goOn() {
synchronized(connections) { synchronized(connections) {
LOG.info("goOn, numConnectons=" + connections.size()); LOG.info("goOn, numConnectons=" + connections.size());
for (Connection con : connections) { for (Bridge con : connections) {
con.goOn(); con.goOn();
} }
} }
acceptor.goOn(); acceptor.goOn();
} }
private void closeConnection(Connection c) { private void closeConnection(Bridge c) {
try { try {
c.close(); c.close();
} catch (Exception e) { } catch (Exception e) {
@ -165,20 +180,28 @@ public class SocketProxy {
} }
} }
private void halfCloseConnection(Bridge c) {
try {
c.halfClose();
} catch (Exception e) {
LOG.debug("exception on half close of: " + c, e);
}
}
private URI urlFromSocket(URI uri, ServerSocket serverSocket) throws Exception { private URI urlFromSocket(URI uri, ServerSocket serverSocket) throws Exception {
int listenPort = serverSocket.getLocalPort(); int listenPort = serverSocket.getLocalPort();
return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), listenPort, uri.getPath(), uri.getQuery(), uri.getFragment()); return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), listenPort, uri.getPath(), uri.getQuery(), uri.getFragment());
} }
public class Connection { public class Bridge {
private Socket receiveSocket; private Socket receiveSocket;
private Socket sendSocket; private Socket sendSocket;
private Pump requestThread; private Pump requestThread;
private Pump responseThread; private Pump responseThread;
public Connection(Socket socket, URI target) throws Exception { public Bridge(Socket socket, URI target) throws Exception {
receiveSocket = socket; receiveSocket = socket;
sendSocket = new Socket(); sendSocket = new Socket();
if (receiveBufferSize > 0) { if (receiveBufferSize > 0) {
@ -207,10 +230,14 @@ public class SocketProxy {
sendSocket.close(); sendSocket.close();
} }
public void halfClose() throws Exception {
receiveSocket.close();
}
private void linkWithThreads(Socket source, Socket dest) { private void linkWithThreads(Socket source, Socket dest) {
requestThread = new Pump(source, dest); requestThread = new Pump(source, dest);
responseThread = new Pump(dest, source);
requestThread.start(); requestThread.start();
responseThread = new Pump(dest, source);
responseThread.start(); responseThread.start();
} }
@ -252,12 +279,15 @@ public class SocketProxy {
} catch (Exception e) { } catch (Exception e) {
LOG.debug("read/write failed, reason: " + e.getLocalizedMessage()); LOG.debug("read/write failed, reason: " + e.getLocalizedMessage());
try { try {
if (!receiveSocket.isClosed()) {
// for halfClose, on read/write failure if we close the
// remote end will see a close at the same time.
close(); close();
}
} catch (Exception ignore) { } catch (Exception ignore) {
} }
} }
} }
} }
} }
@ -293,14 +323,13 @@ public class SocketProxy {
pause.get().await(); pause.get().await();
try { try {
Socket source = socket.accept(); Socket source = socket.accept();
LOG.info("accepted " + source + ", receiveBufferSize:" + source.getReceiveBufferSize());
pause.get().await(); pause.get().await();
if (receiveBufferSize > 0) { if (receiveBufferSize > 0) {
source.setReceiveBufferSize(receiveBufferSize); source.setReceiveBufferSize(receiveBufferSize);
} }
LOG.info("accepted " + source + ", receiveBufferSize:" + source.getReceiveBufferSize()); LOG.info("accepted " + source + ", receiveBufferSize:" + source.getReceiveBufferSize());
synchronized(connections) { synchronized(connections) {
connections.add(new Connection(source, target)); connections.add(new Bridge(source, target));
} }
} catch (SocketTimeoutException expected) { } catch (SocketTimeoutException expected) {
} }