diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 144e202ecc..f335495b0e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -150,7 +150,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { private final TaskRunnerFactory taskRunnerFactory; private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); - private BrokerId duplexRemoteBrokerId; + private String duplexNetworkConnectorId; /** * @param connector @@ -946,7 +946,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { serviceLock.writeLock().unlock(); } } - }); + }, "StopAsync:" + transport.getRemoteAddress()); } catch (Throwable t) { LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t); stopped.countDown(); @@ -1179,25 +1179,32 @@ public class TransportConnection implements Connection, Task, CommandVisitor { // so this TransportConnection is the rear end of a network bridge // We have been requested to create a two way pipe ... try { - // We first look if existing network connection already exists for the same broker Id and network connector name - // It's possible in case of brief network fault to have this transport connector side of the connection always active - // and the duplex network connector side wanting to open a new one - // In this case, the old connection must be broken - BrokerId remoteBrokerId = info.getBrokerId(); - setDuplexRemoteBrokerId(remoteBrokerId); - CopyOnWriteArrayList connections = this.connector.getConnections(); - for (Iterator iter = connections.iterator(); iter.hasNext();) { - TransportConnection c = iter.next(); - if ((c != this) && (remoteBrokerId.equals(c.getDuplexRemoteBrokerId()))) { - LOG.warn("An existing duplex active connection already exists for this broker (" + remoteBrokerId + "). Stopping it."); - c.stop(); - } - } Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties()); Map props = createMap(properties); NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); IntrospectionSupport.setProperties(config, props, ""); config.setBrokerName(broker.getBrokerName()); + + // check for existing duplex connection hanging about + + // We first look if existing network connection already exists for the same broker Id and network connector name + // It's possible in case of brief network fault to have this transport connector side of the connection always active + // and the duplex network connector side wanting to open a new one + // In this case, the old connection must be broken + String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId(); + CopyOnWriteArrayList connections = this.connector.getConnections(); + synchronized (connections) { + for (Iterator iter = connections.iterator(); iter.hasNext();) { + TransportConnection c = iter.next(); + if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) { + LOG.warn("Stopping an existing active duplex connection [" + c + "] for network connector (" + duplexNetworkConnectorId + ")."); + c.stopAsync(); + // better to wait for a bit rather than get connection id already in use and failure to start new bridge + c.getStopped().await(1, TimeUnit.SECONDS); + } + } + setDuplexNetworkConnectorId(duplexNetworkConnectorId); + } URI uri = broker.getVmConnectorURI(); HashMap map = new HashMap(URISupport.parseParameters(uri)); map.put("network", "true"); @@ -1217,13 +1224,14 @@ public class TransportConnection implements Connection, Task, CommandVisitor { info.setDuplexConnection(false); duplexBridge.setCreatedByDuplex(true); duplexBridge.duplexStart(this, brokerInfo, info); - LOG.info("Created Duplex Bridge back to " + info.getBrokerName()); + LOG.info("Started responder end of duplex bridge " + duplexNetworkConnectorId); return null; } catch (TransportDisposedIOException e) { - LOG.warn("Duplex Bridge back to " + info.getBrokerName() + " was correctly stopped before it was correctly started."); + LOG.warn("Duplex bridge " + duplexNetworkConnectorId + " was stopped before it was correctly started."); return null; } catch (Exception e) { - LOG.error("Creating duplex network bridge", e); + LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId , e); + return null; } } // We only expect to get one broker info command per connection @@ -1415,11 +1423,15 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return connectionStateRegister.lookupConnectionState(connectionId); } - protected synchronized void setDuplexRemoteBrokerId(BrokerId remoteBrokerId) { - this.duplexRemoteBrokerId = remoteBrokerId; + protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) { + this.duplexNetworkConnectorId = duplexNetworkConnectorId; } - protected synchronized BrokerId getDuplexRemoteBrokerId() { - return this.duplexRemoteBrokerId; + protected synchronized String getDuplexNetworkConnectorId() { + return this.duplexNetworkConnectorId; + } + + protected CountDownLatch getStopped() { + return stopped; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 0f02d3b70c..318947e564 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -219,8 +219,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br remoteBridgeStarted.set(true); startedLatch.countDown(); LOG.info("Outbound transport to " + remoteBrokerName + " resumed"); - } catch (Exception e) { + } catch (Throwable e) { LOG.error("Caught exception from local start in resume transport", e); + serviceLocalException(e); } } } @@ -248,7 +249,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker); try { startLocalBridge(); - } catch (Exception e) { + } catch (Throwable e) { serviceLocalException(e); } finally { Thread.currentThread().setName(originalName); @@ -273,7 +274,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br }); } - protected void startLocalBridge() throws Exception { + protected void startLocalBridge() throws Throwable { if (localBridgeStarted.compareAndSet(false, true)) { synchronized (this) { if (LOG.isTraceEnabled()) { @@ -284,7 +285,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (!disposed.get()) { localConnectionInfo = new ConnectionInfo(); localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); - localClientId = "NC_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName(); + localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName(); localConnectionInfo.setClientId(localClientId); localConnectionInfo.setUserName(configuration.getUserName()); localConnectionInfo.setPassword(configuration.getPassword()); @@ -296,10 +297,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates(); localConnectionInfo.setTransportContext(peerCerts); } - localBroker.oneway(localConnectionInfo); - + // sync requests that may fail + Object resp = localBroker.request(localConnectionInfo); + if (resp instanceof ExceptionResponse) { + throw ((ExceptionResponse)resp).getException(); + } localSessionInfo = new SessionInfo(localConnectionInfo, 1); localBroker.oneway(localSessionInfo); + brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex); NetworkBridgeListener l = this.networkBridgeListener; if (l != null) { @@ -346,7 +351,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } remoteConnectionInfo = new ConnectionInfo(); remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); - remoteConnectionInfo.setClientId("NC_" + configuration.getBrokerName() + "_outbound"); + remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound"); remoteConnectionInfo.setUserName(configuration.getUserName()); remoteConnectionInfo.setPassword(configuration.getPassword()); remoteBroker.oneway(remoteConnectionInfo); @@ -857,7 +862,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br /** * @return Returns the staticallyIncludedDestinations. */ - public ActiveMQDestination[] getStaticallyIncludedDestinations() { + public ActiveMQDestination[] getStaticallyIncludedestinations() { return staticallyIncludedDestinations; } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java index 5d718e8290..18fc08e7dd 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java @@ -232,15 +232,6 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco return configureBridge(result); } - public String getName() { - String name = super.getName(); - if (name == null) { - name = discoveryAgent.toString(); - super.setName(name); - } - return name; - } - @Override public String toString() { return "DiscoveryNetworkConnector:" + getName() + ":" + getBrokerService(); diff --git a/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java index aff596599c..3734dbcd64 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java @@ -269,20 +269,9 @@ public class LdapNetworkConnector context.close(); } - /** - * returns the name of the connector - * - * @return connector name - */ - public String getName() { - - String name = super.getName(); - if (name == null) { - name = this.getClass().getName() + " [" + ldapURI.toString() + "]"; - super.setName(name); - } - return name; - } + public String toString() { + return this.getClass().getName() + getName() + "[" + ldapURI.toString() + "]"; + } /** * add connector of the given URI diff --git a/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java index 74da791adb..1380d14dd2 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java @@ -141,13 +141,9 @@ public class MulticastNetworkConnector extends NetworkConnector { } } - public String getName() { - String name = super.getName(); - if(name == null) { - name = remoteTransport.toString(); - super.setName(name); - } - return name; + @Override + public String toString() { + return getClass().getName() + ":" + getName() + "[" + remoteTransport.toString() + "]"; } protected DemandForwardingBridgeSupport createBridge(Transport local, Transport remote) { diff --git a/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java index aa8b63d1a2..0c5c494d44 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java @@ -39,7 +39,7 @@ public class NetworkBridgeConfiguration { private String userName; private String password; private String destinationFilter = ">"; - private String name = null; + private String name = "NC"; private List excludedDestinations; private List dynamicallyIncludedDestinations; @@ -223,9 +223,6 @@ public class NetworkBridgeConfiguration { * @return the name */ public String getName() { - if(this.name == null) { - this.name = "localhost"; - } return this.name; } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java index a3b935d38a..7cf1e6f470 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java @@ -210,11 +210,11 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem if (localURI == null) { throw new IllegalStateException("You must configure the 'localURI' property"); } - LOG.info("Network Connector " + getName() + " Started"); + LOG.info("Network Connector " + this + " Started"); } protected void handleStop(ServiceStopper stopper) throws Exception { - LOG.info("Network Connector " + getName() + " Stopped"); + LOG.info("Network Connector " + this + " Stopped"); } public ObjectName getObjectName() { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java index 50b3ff7bdb..27f01df7ad 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java @@ -243,7 +243,7 @@ public class InactivityMonitor extends TransportFilter { try { if( failed.get() ) { - throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()); + throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress()); } if (o.getClass() == WireFormatInfo.class) { synchronized (this) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java index 99e7cbd917..a60da5e25a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java @@ -134,4 +134,8 @@ public class VMTransportServer implements TransportServer { public InetSocketAddress getSocketAddress() { return null; } + + public int getConnectionCount() { + return connectionCount.intValue(); + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java index 5d76001604..48efa68980 100755 --- a/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java @@ -107,7 +107,7 @@ public class DiscoveryNetworkReconnectTest { allowing (managementContext).registerMBean(with(any(Object.class)), with(equal( new ObjectName("Test:BrokerName=BrokerNC,Type=Broker")))); allowing (managementContext).registerMBean(with(any(Object.class)), with(equal( - new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkConnector,NetworkConnectorName=localhost")))); + new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkConnector,NetworkConnectorName=NC")))); allowing (managementContext).registerMBean(with(any(Object.class)), with(equal( new ObjectName("Test:BrokerName=BrokerNC,Type=Topic,Destination=ActiveMQ.Advisory.Connection")))); allowing (managementContext).registerMBean(with(any(Object.class)), with(equal( @@ -116,7 +116,7 @@ public class DiscoveryNetworkReconnectTest { new ObjectName("Test:BrokerName=BrokerNC,Type=jobScheduler,jobSchedulerName=JMS")))); atLeast(maxReconnects - 1).of (managementContext).registerMBean(with(any(Object.class)), with(new NetworkBridgeObjectNameMatcher( - new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=localhost,Name=localhost/127.0.0.1_" + new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=NC,Name=localhost/127.0.0.1_" + proxy.getUrl().getPort())))); will(new CustomAction("signal register network mbean") { public Object invoke(Invocation invocation) throws Throwable { LOG.info("Mbean Registered: " + invocation.getParameter(0)); @@ -125,7 +125,7 @@ public class DiscoveryNetworkReconnectTest { } }); atLeast(maxReconnects - 1).of (managementContext).unregisterMBean(with(new NetworkBridgeObjectNameMatcher( - new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=localhost,Name=localhost/127.0.0.1_" + new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=NC,Name=localhost/127.0.0.1_" + proxy.getUrl().getPort())))); will(new CustomAction("signal unregister network mbean") { public Object invoke(Invocation invocation) throws Throwable { LOG.info("Mbean Unregistered: " + invocation.getParameter(0)); @@ -137,7 +137,7 @@ public class DiscoveryNetworkReconnectTest { allowing (managementContext).unregisterMBean(with(equal( new ObjectName("Test:BrokerName=BrokerNC,Type=Broker")))); allowing (managementContext).unregisterMBean(with(equal( - new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkConnector,NetworkConnectorName=localhost")))); + new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkConnector,NetworkConnectorName=NC")))); allowing (managementContext).unregisterMBean(with(equal( new ObjectName("Test:BrokerName=BrokerNC,Type=Topic,Destination=ActiveMQ.Advisory.Connection")))); allowing (managementContext).unregisterMBean(with(equal( diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java index ffbbba2229..5b4980b13a 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java @@ -18,7 +18,7 @@ package org.apache.activemq.usecases; import java.net.URI; import java.util.List; - +import java.util.concurrent.TimeUnit; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.TextMessage; @@ -26,23 +26,34 @@ import javax.jms.TextMessage; import junit.framework.Test; import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.network.DiscoveryNetworkConnector; import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.transport.vm.VMTransportFactory; import org.apache.activemq.util.MessageIdList; import org.apache.activemq.util.SocketProxy; +import org.apache.activemq.util.Wait; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTestSupport { + private static final Log LOG = LogFactory.getLog(BrokerQueueNetworkWithDisconnectTest.class); private static final int NETWORK_DOWN_TIME = 5000; protected static final int MESSAGE_COUNT = 200; private static final String HUB = "HubBroker"; private static final String SPOKE = "SpokeBroker"; private SocketProxy socketProxy; private long networkDownTimeStart; - public boolean useDuplexNetworkBridge; + public boolean useDuplexNetworkBridge = true; public boolean sumulateStalledNetwork; + private long inactiveDuration = 1000; + private boolean useSocketProxy = true; public void initCombosForTestSendOnAReceiveOnBWithTransportDisconnect() { @@ -75,6 +86,80 @@ public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTest MESSAGE_COUNT <= msgs.getMessageCount()); } + public void testNoStuckConnectionsWithTransportDisconnect() throws Exception { + inactiveDuration=60000l; + useDuplexNetworkBridge = true; + + bridgeBrokers(SPOKE, HUB); + + final BrokerItem hub = brokers.get(HUB); + hub.broker.setPlugins(new BrokerPlugin[]{ + new BrokerPluginSupport() { + int sleepCount = 2; + @Override + public void removeConnection(ConnectionContext context, + ConnectionInfo info, Throwable error) + throws Exception { + try { + while(--sleepCount >= 0) { + LOG.info("sleeping for a bit in close impl to simulate load where reconnect fails due to a pending close"); + TimeUnit.SECONDS.sleep(2); + } + } catch (Exception ignored) {} + super.removeConnection(context, info, error); + } + } + }); + startAllBrokers(); + waitForBridgeFormation(); + + // kill the initiator side, leaving remote end intact + // simulate async network breakage + // remote side will need to spot duplicate network and stop/kill the original + for (int i=0; i< 3; i++) { + socketProxy.halfClose(); + sleep(10000); + } + // wait for full reformation of bridge + // verify no extra connections + boolean allGood = Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + long numConnections = hub.broker.getTransportConnectors().get(0).getConnections().size(); + LOG.info("Num connetions:" + numConnections); + return numConnections == 1; + }}); + if (!allGood) { + dumpAllThreads("ExtraHubConnection"); + } + assertTrue("should be only one transport connection for the single duplex network connector", allGood); + + allGood = Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + long numVmConnections = VMTransportFactory.SERVERS.get(HUB).getConnectionCount(); + LOG.info("Num VM connetions:" + numVmConnections); + return numVmConnections == 1; + }}); + if (!allGood) { + dumpAllThreads("ExtraHubVMConnection"); + } + assertTrue("should be only one vm connection for the single network duplex network connector", allGood); + } + + public void testTwoDuplexNCsAreAllowed() throws Exception { + useDuplexNetworkBridge = true; + useSocketProxy = false; + + NetworkConnector connector = bridgeBrokers(SPOKE, HUB); + connector.setName("FirstDuplex"); + connector = bridgeBrokers(SPOKE, HUB); + connector.setName("SecondDuplex"); + + startAllBrokers(); + waitForBridgeFormation(); + + BrokerItem hub = brokers.get(HUB); + assertEquals("Has two transport Connectors", 2, hub.broker.getTransportConnectors().get(0).getConnections().size()); + } @Override protected void startAllBrokers() throws Exception { @@ -88,6 +173,8 @@ public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTest public void setUp() throws Exception { networkDownTimeStart = 0; + inactiveDuration = 1000; + useSocketProxy = true; super.setAutoFail(true); super.setUp(); final String options = "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true"; @@ -95,6 +182,13 @@ public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTest createBroker(new URI("broker:(tcp://localhost:61616)/" + SPOKE + options)); } + public void tearDown() throws Exception { + super.tearDown(); + if (socketProxy != null) { + socketProxy.close(); + } + } + public static Test suite() { return suite(BrokerQueueNetworkWithDisconnectTest.class); } @@ -133,16 +227,18 @@ public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTest } } - @Override protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit, boolean failover) throws Exception { List transportConnectors = remoteBroker.getTransportConnectors(); URI remoteURI; if (!transportConnectors.isEmpty()) { remoteURI = ((TransportConnector)transportConnectors.get(0)).getConnectUri(); - socketProxy = new SocketProxy(remoteURI); - DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:(" + socketProxy.getUrl() - + "?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000)?useExponentialBackOff=false")); + if (useSocketProxy) { + socketProxy = new SocketProxy(remoteURI); + remoteURI = socketProxy.getUrl(); + } + DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:(" + remoteURI + + "?wireFormat.maxInactivityDuration=" + inactiveDuration + "&wireFormat.maxInactivityDurationInitalDelay=" + inactiveDuration + ")?useExponentialBackOff=false")); connector.setDynamicOnly(dynamicOnly); connector.setNetworkTTL(networkTTL); localBroker.addNetworkConnector(connector); @@ -154,7 +250,5 @@ public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTest } else { throw new Exception("Remote broker has no registered connectors."); } - } - } diff --git a/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java b/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java index 30c8db3f11..a7653bd0e3 100644 --- a/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java +++ b/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java @@ -50,7 +50,7 @@ public class SocketProxy { private CountDownLatch closed = new CountDownLatch(1); - public List connections = new LinkedList(); + public List connections = new LinkedList(); private int listenPort = 0; @@ -102,18 +102,33 @@ public class SocketProxy { * close all proxy connections and acceptor */ public void close() { - List connections; + List connections; synchronized(this.connections) { - connections = new ArrayList(this.connections); + connections = new ArrayList(this.connections); } LOG.info("close, numConnectons=" + connections.size()); - for (Connection con : connections) { + for (Bridge con : connections) { closeConnection(con); } acceptor.close(); closed.countDown(); } + /* + * close all proxy receive connections, leaving acceptor + * open + */ + public void halfClose() { + List connections; + synchronized(this.connections) { + connections = new ArrayList(this.connections); + } + LOG.info("halfClose, numConnectons=" + connections.size()); + for (Bridge con : connections) { + halfCloseConnection(con); + } + } + public boolean waitUntilClosed(long timeoutSeconds) throws InterruptedException { return closed.await(timeoutSeconds, TimeUnit.SECONDS); } @@ -138,7 +153,7 @@ public class SocketProxy { synchronized(connections) { LOG.info("pause, numConnectons=" + connections.size()); acceptor.pause(); - for (Connection con : connections) { + for (Bridge con : connections) { con.pause(); } } @@ -150,14 +165,14 @@ public class SocketProxy { public void goOn() { synchronized(connections) { LOG.info("goOn, numConnectons=" + connections.size()); - for (Connection con : connections) { + for (Bridge con : connections) { con.goOn(); } } acceptor.goOn(); } - private void closeConnection(Connection c) { + private void closeConnection(Bridge c) { try { c.close(); } catch (Exception e) { @@ -165,20 +180,28 @@ public class SocketProxy { } } + private void halfCloseConnection(Bridge c) { + try { + c.halfClose(); + } catch (Exception e) { + LOG.debug("exception on half close of: " + c, e); + } + } + private URI urlFromSocket(URI uri, ServerSocket serverSocket) throws Exception { int listenPort = serverSocket.getLocalPort(); return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), listenPort, uri.getPath(), uri.getQuery(), uri.getFragment()); } - public class Connection { + public class Bridge { private Socket receiveSocket; private Socket sendSocket; private Pump requestThread; private Pump responseThread; - public Connection(Socket socket, URI target) throws Exception { + public Bridge(Socket socket, URI target) throws Exception { receiveSocket = socket; sendSocket = new Socket(); if (receiveBufferSize > 0) { @@ -207,10 +230,14 @@ public class SocketProxy { sendSocket.close(); } + public void halfClose() throws Exception { + receiveSocket.close(); + } + private void linkWithThreads(Socket source, Socket dest) { requestThread = new Pump(source, dest); - responseThread = new Pump(dest, source); requestThread.start(); + responseThread = new Pump(dest, source); responseThread.start(); } @@ -252,12 +279,15 @@ public class SocketProxy { } catch (Exception e) { LOG.debug("read/write failed, reason: " + e.getLocalizedMessage()); try { - close(); + if (!receiveSocket.isClosed()) { + // for halfClose, on read/write failure if we close the + // remote end will see a close at the same time. + close(); + } } catch (Exception ignore) { } } } - } } @@ -293,14 +323,13 @@ public class SocketProxy { pause.get().await(); try { Socket source = socket.accept(); - LOG.info("accepted " + source + ", receiveBufferSize:" + source.getReceiveBufferSize()); pause.get().await(); if (receiveBufferSize > 0) { source.setReceiveBufferSize(receiveBufferSize); } LOG.info("accepted " + source + ", receiveBufferSize:" + source.getReceiveBufferSize()); synchronized(connections) { - connections.add(new Connection(source, target)); + connections.add(new Bridge(source, target)); } } catch (SocketTimeoutException expected) { }