diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index c1a949fd07..fe18b6c639 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -1015,9 +1015,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor { ignore.printStackTrace(); } } - if (brokerInfo != null) { - broker.removeBroker(this, brokerInfo); - } } LOG.debug("Connection Stopped: " + getRemoteAddress()); } @@ -1182,7 +1179,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { // so this TransportConnection is the rear end of a network bridge // We have been requested to create a two way pipe ... try { - // We first look if existing network connection already exists for the same broker Id + // We first look if existing network connection already exists for the same broker Id and network connector name // It's possible in case of brief network fault to have this transport connector side of the connection always active // and the duplex network connector side wanting to open a new one // In this case, the old connection must be broken @@ -1234,7 +1231,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor { LOG.warn("Unexpected extra broker info command received: " + info); } this.brokerInfo = info; - broker.addBroker(this, info); networkConnection = true; List connectionStates = listConnectionStates(); for (TransportConnectionState cs : connectionStates) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index bcd2dc45bd..4bd14bc70d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -94,7 +94,7 @@ public class RegionBroker extends EmptyBroker { private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); private final Map destinations = new ConcurrentHashMap(); - private final CopyOnWriteArrayList brokerInfos = new CopyOnWriteArrayList(); + private final Map brokerInfos = new HashMap(); private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); private BrokerId brokerId; @@ -640,14 +640,25 @@ public class RegionBroker extends EmptyBroker { @Override public synchronized void addBroker(Connection connection, BrokerInfo info) { - brokerInfos.add(info); + BrokerInfo existing = brokerInfos.get(info.getBrokerId()); + if (existing == null) { + existing = info.copy(); + existing.setPeerBrokerInfos(null); + brokerInfos.put(info.getBrokerId(), existing); + } + existing.incrementRefCount(); + LOG.debug(getBrokerName() + " addBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size()); addBrokerInClusterUpdate(); } @Override public synchronized void removeBroker(Connection connection, BrokerInfo info) { if (info != null) { - brokerInfos.remove(info); + BrokerInfo existing = brokerInfos.get(info.getBrokerId()); + if (existing != null && existing.decrementRefCount() == 0) { + brokerInfos.remove(info.getBrokerId()); + } + LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size()); removeBrokerInClusterUpdate(); } } @@ -655,7 +666,7 @@ public class RegionBroker extends EmptyBroker { @Override public synchronized BrokerInfo[] getPeerBrokerInfos() { BrokerInfo[] result = new BrokerInfo[brokerInfos.size()]; - result = brokerInfos.toArray(result); + result = brokerInfos.values().toArray(result); return result; } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java index 9bd30b9dd6..6ae85de08f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java @@ -45,6 +45,7 @@ public class BrokerInfo extends BaseCommand { long connectionId; String brokerUploadUrl; String networkProperties; + transient int refCount = 0; public BrokerInfo copy() { BrokerInfo copy = new BrokerInfo(); @@ -265,4 +266,15 @@ public class BrokerInfo extends BaseCommand { } return result; } + + public int getRefCount() { + return refCount; + } + + public void incrementRefCount() { + refCount++; + } + public int decrementRefCount() { + return --refCount; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index fc4febff18..0f02d3b70c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -228,20 +228,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br localBroker.start(); remoteBroker.start(); - if (configuration.isDuplex() && duplexInitiatingConnection == null) { - // initiator side of duplex network - remoteBrokerNameKnownLatch.await(); - } if (!disposed.get()) { try { triggerRemoteStartBridge(); } catch (IOException e) { LOG.warn("Caught exception from remote start", e); } - NetworkBridgeListener l = this.networkBridgeListener; - if (l != null) { - l.onStart(this); - } } else { LOG.warn ("Bridge was disposed before the start() method was fully executed."); throw new TransportDisposedIOException(); @@ -309,6 +301,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br localSessionInfo = new SessionInfo(localConnectionInfo, 1); localBroker.oneway(localSessionInfo); brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex); + NetworkBridgeListener l = this.networkBridgeListener; + if (l != null) { + l.onStart(this); + } LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established."); } else { @@ -419,6 +415,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br ss.throwFirstException(); } } + brokerService.getBroker().removeBroker(null, remoteBrokerInfo); brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped"); remoteBrokerNameKnownLatch.countDown(); @@ -480,6 +477,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br serviceRemoteBrokerInfo(command); // Let the local broker know the remote broker's ID. localBroker.oneway(command); + // new peer broker (a consumer can work with remote broker also) + brokerService.getBroker().addBroker(null, remoteBrokerInfo); } else if (command.getClass() == ConnectionError.class) { ConnectionError ce = (ConnectionError) command; serviceRemoteException(ce.getException()); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 92cd1d037f..5be05330f1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -120,6 +120,7 @@ public class FailoverTransport implements CompositeTransport { private SslContext brokerSslContext; private String updateURIsURL = null; private boolean rebalanceUpdateURIs=true; + private boolean doRebalance = false; public FailoverTransport() throws InterruptedIOException { brokerSslContext = SslContext.getCurrentSslContext(); @@ -131,7 +132,7 @@ public class FailoverTransport implements CompositeTransport { boolean buildBackup = true; boolean doReconnect = !disposed; synchronized (backupMutex) { - if (connectedTransport.get() == null && !disposed) { + if ((connectedTransport.get() == null || doRebalance) && !disposed) { result = doReconnect(); buildBackup = false; } @@ -623,7 +624,7 @@ public class FailoverTransport implements CompositeTransport { for (int i = 0; i < u.length; i++) { uris.remove(u[i]); } - reconnect(rebalance); + // rebalance is automatic if any connected to removed/stopped broker } public void add(boolean rebalance, String u) { @@ -643,15 +644,7 @@ public class FailoverTransport implements CompositeTransport { synchronized (reconnectMutex) { if (started) { if (rebalance) { - Transport transport = this.connectedTransport.getAndSet(null); - if (transport != null) { - try { - transport.stop(); - } catch (Exception e) { - LOG.debug("Caught an exception stopping existing transport", e); - } - } - + doRebalance = true; } LOG.debug("Waking up reconnect task"); try { @@ -683,7 +676,7 @@ public class FailoverTransport implements CompositeTransport { if (removed) { l.add(failedConnectTransportURI); } - LOG.debug("urlList connectionList:" + l); + LOG.debug("urlList connectionList:" + l + ", from: " + uris); return l; } @@ -798,13 +791,31 @@ public class FailoverTransport implements CompositeTransport { reconnectMutex.notifyAll(); } - if (connectedTransport.get() != null || disposed || connectionFailure != null) { + if ((connectedTransport.get() != null && !doRebalance) || disposed || connectionFailure != null) { return false; } else { List connectList = getConnectList(); if (connectList.isEmpty()) { failure = new IOException("No uris available to connect to."); } else { + if (doRebalance) { + if (connectList.get(0).equals(connectedTransportURI)) { + // already connected to first in the list, no need to rebalance + doRebalance = false; + return false; + } else { + LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList); + try { + Transport transport = this.connectedTransport.getAndSet(null); + if (transport != null) { + transport.stop(); + } + } catch (Exception e) { + LOG.debug("Caught an exception stopping existing transport for rebalance", e); + } + } + doRebalance = false; + } if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) { reconnectDelay = initialReconnectDelay; } diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java index cdcd0c8054..ba0caf1b50 100644 --- a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java @@ -152,19 +152,23 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { maxSetupTime = 8000; } - - protected void waitForBridgeFormation() throws Exception { + + protected void waitForBridgeFormation(final int min) throws Exception { for (BrokerItem brokerItem : brokers.values()) { final BrokerService broker = brokerItem.broker; if (!broker.getNetworkConnectors().isEmpty()) { Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { - return !broker.getNetworkConnectors().get(0).activeBridges().isEmpty(); + return (broker.getNetworkConnectors().get(0).activeBridges().size() >= min); }}); } } } + protected void waitForBridgeFormation() throws Exception { + waitForBridgeFormation(1); + } + protected void startAllBrokers() throws Exception { Collection brokerList = brokers.values(); for (Iterator i = brokerList.iterator(); i.hasNext();) { @@ -517,6 +521,7 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { } broker.stop(); + broker.waitUntilStopped(); consumers.clear(); broker = null; diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java index a866d7a70a..80de199f07 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java @@ -35,31 +35,32 @@ import org.apache.activemq.network.NetworkConnector; public class FailoverClusterTest extends TestCase { -private static final int NUMBER = 10; -private static final String BROKER_A_BIND_ADDRESS = "tcp://0.0.0.0:61616"; -private static final String BROKER_B_BIND_ADDRESS = "tcp://0.0.0.0:61617"; -private static final String CLIENT_URL = "failover://("+BROKER_A_BIND_ADDRESS+")"; -private static final String BROKER_A_NAME = "BROKERA"; -private static final String BROKER_B_NAME = "BROKERB"; -private BrokerService brokerA; -private BrokerService brokerB; -private final Listconnections = new ArrayList(); + private static final int NUMBER = 10; + private static final String BROKER_A_BIND_ADDRESS = "tcp://0.0.0.0:61616"; + private static final String BROKER_B_BIND_ADDRESS = "tcp://0.0.0.0:61617"; + private static final String BROKER_A_NAME = "BROKERA"; + private static final String BROKER_B_NAME = "BROKERB"; + private BrokerService brokerA; + private BrokerService brokerB; + private String clientUrl; + + private final List connections = new ArrayList(); - public void testClusterConnectedAfterClients() throws Exception{ - createClients(); - if (brokerB == null) { - brokerB = createBrokerB(BROKER_B_BIND_ADDRESS); - } - Thread.sleep(3000); - Set set = new HashSet(); - for (ActiveMQConnection c:connections) { - set.add(c.getTransportChannel().getRemoteAddress()); - } - assertTrue(set.size() > 1); - } + public void testClusterConnectedAfterClients() throws Exception { + createClients(); + if (brokerB == null) { + brokerB = createBrokerB(BROKER_B_BIND_ADDRESS); + } + Thread.sleep(3000); + Set set = new HashSet(); + for (ActiveMQConnection c : connections) { + set.add(c.getTransportChannel().getRemoteAddress()); + } + assertTrue(set.size() > 1); + } - public void testClusterURIOptionsStrip() throws Exception{ + public void testClusterURIOptionsStrip() throws Exception { createClients(); if (brokerB == null) { // add in server side only url param, should not be propagated @@ -67,45 +68,44 @@ private final Listconnections = new ArrayList set = new HashSet(); - for (ActiveMQConnection c:connections) { + for (ActiveMQConnection c : connections) { set.add(c.getTransportChannel().getRemoteAddress()); } assertTrue(set.size() > 1); } - - public void testClusterConnectedBeforeClients() throws Exception{ - - if (brokerB == null) { - brokerB = createBrokerB(BROKER_B_BIND_ADDRESS); - } - Thread.sleep(5000); - createClients(); - Thread.sleep(2000); - brokerA.stop(); - Thread.sleep(2000); - - URI brokerBURI = new URI(BROKER_B_BIND_ADDRESS); - for (ActiveMQConnection c:connections) { - String addr = c.getTransportChannel().getRemoteAddress(); - assertTrue(addr.indexOf(""+brokerBURI.getPort()) > 0); - } - } + + public void testClusterConnectedBeforeClients() throws Exception { + + if (brokerB == null) { + brokerB = createBrokerB(BROKER_B_BIND_ADDRESS); + } + Thread.sleep(5000); + createClients(); + Thread.sleep(2000); + brokerA.stop(); + Thread.sleep(2000); + + URI brokerBURI = new URI(BROKER_B_BIND_ADDRESS); + for (ActiveMQConnection c : connections) { + String addr = c.getTransportChannel().getRemoteAddress(); + assertTrue(addr.indexOf("" + brokerBURI.getPort()) > 0); + } + } @Override protected void setUp() throws Exception { if (brokerA == null) { - brokerA = createBrokerA(BROKER_A_BIND_ADDRESS + "?transport.closeAsync=false"); + brokerA = createBrokerA(BROKER_A_BIND_ADDRESS + "?transport.closeAsync=false"); + clientUrl = "failover://(" + brokerA.getTransportConnectors().get(0).getPublishableConnectString() + ")"; } - - } @Override protected void tearDown() throws Exception { - for (Connection c:connections) { + for (Connection c : connections) { c.close(); - } + } if (brokerB != null) { brokerB.stop(); brokerB = null; @@ -115,16 +115,16 @@ private final Listconnections = new ArrayListconnections = new ArrayList brokerList = brokers.values(); + for (Iterator i = brokerList.iterator(); i.hasNext();) { + BrokerService broker = i.next().broker; + List transportConnectors = broker.getTransportConnectors(); + + if (transportConnectors.isEmpty()) { + broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT)); + transportConnectors = broker.getTransportConnectors(); + } + + TransportConnector transport = transportConnectors.get(0); + if (transport.getDiscoveryUri() == null) { + transport.setDiscoveryUri(new URI("multicast://default?group=" + groupName)); + } + + List networkConnectors = broker.getNetworkConnectors(); + if (networkConnectors.isEmpty()) { + broker.addNetworkConnector("multicast://default?group=" + groupName); + networkConnectors = broker.getNetworkConnectors(); + } + + NetworkConnector nc = networkConnectors.get(0); + nc.setNetworkTTL(ttl); + nc.setSuppressDuplicateQueueSubscriptions(suppressduplicateQueueSubs); + nc.setDecreaseNetworkConsumerPriority(decreasePriority); + } + + // Multicasting may take longer to setup + maxSetupTime = 8000; + } + + protected BrokerService createBroker(String brokerName) throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.setBrokerName(brokerName); + broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT)); + brokers.put(brokerName, new BrokerItem(broker)); + + return broker; + } + + /* AMQ-3077 Bug */ + public void testBrokers() throws Exception { + int X = 20; + int i; + + LOG.info("Creating X Brokers"); + for (i = 0; i < X; i++) { + createBroker("Broker" + i); + } + + bridgeAllBrokers(); + startAllBrokers(); + waitForBridgeFormation(X-1); + + verifyPeerBrokerInfos(X-1); + + LOG.info("Stopping half the brokers"); + for (i = 0; i < X/2; i++) { + destroyBroker("Broker" + i); + } + + LOG.info("Waiting for complete stop"); + try { + Thread.sleep(10000); + } catch (Exception e) { + } + + verifyPeerBrokerInfos((X/2) -1); + + LOG.info("Recreating first half"); + for (i = 0; i < X/2; i++) { + createBroker("Broker" + i); + } + + bridgeAllBrokers(); + startAllBrokers(); + waitForBridgeFormation(X-1); + + verifyPeerBrokerInfos(X-1); + } + + public void testPeerBrokerCountHalfPeer() throws Exception { + createBroker("A"); + createBroker("B"); + bridgeBrokers("A", "B"); + startAllBrokers(); + verifyPeerBrokerInfo(brokers.get("A"), 1); + verifyPeerBrokerInfo(brokers.get("B"), 0); + } + + public void testPeerBrokerCountHalfPeerTwice() throws Exception { + createBroker("A"); + createBroker("B"); + bridgeBrokers("A", "B"); + bridgeBrokers("A", "B"); + startAllBrokers(); + verifyPeerBrokerInfo(brokers.get("A"), 1); + verifyPeerBrokerInfo(brokers.get("B"), 0); + } + + public void testPeerBrokerCountFullPeer() throws Exception { + createBroker("A"); + createBroker("B"); + bridgeBrokers("A", "B"); + bridgeBrokers("B", "A"); + startAllBrokers(); + verifyPeerBrokerInfo(brokers.get("A"), 1); + verifyPeerBrokerInfo(brokers.get("B"), 1); + } + + public void testPeerBrokerCountFullPeerDuplex() throws Exception { + createBroker("A"); + createBroker("B"); + NetworkConnector nc = bridgeBrokers("A", "B"); + nc.setDuplex(true); + startAllBrokers(); + verifyPeerBrokerInfo(brokers.get("A"), 1); + verifyPeerBrokerInfo(brokers.get("B"), 1); + } + + + private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) { + BrokerService broker = brokerItem.broker; + RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); + LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length); + for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) { + LOG.info(info.getBrokerName()); + } + assertEquals(broker.getBrokerName(), max, regionBroker.getPeerBrokerInfos().length); + } + + private void verifyPeerBrokerInfos(final int max) { + Collection brokerList = brokers.values(); + for (Iterator i = brokerList.iterator(); i.hasNext();) { + verifyPeerBrokerInfo(i.next(), max); + } + } + + @Override + public void setUp() throws Exception { + super.setAutoFail(true); + super.setUp(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadTracker.result(); + } +}