From c7522305732396dbee751ea37e2554f2420908be Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Thu, 13 Jan 2011 14:18:14 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3077 - ArraysIndexOutOfBoundsException : -32768 in "BrokerService[xxx] Task" thread - brokerInfo and peerBroker infro explosion problems. A peer is a oneway relationship with networks, broker infos were being accumulated in duplicate for each connector and for multiple connectors. The peer broker info was maintained for each which caused the problem marshalling. re: https://issues.apache.org/jira/browse/AMQ-2632 - the configuration is now respected so it can be selectively enabled and rebalance only occurs if we randomly choose an alternative. The nested peer broker info is not propagated in a connection control git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1058577 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/TransportConnection.java | 6 +- .../activemq/broker/region/RegionBroker.java | 19 +- .../apache/activemq/command/BrokerInfo.java | 12 ++ .../DemandForwardingBridgeSupport.java | 15 +- .../transport/failover/FailoverTransport.java | 37 ++-- .../JmsMultipleBrokersTestSupport.java | 11 +- .../failover/FailoverClusterTest.java | 120 +++++------ .../usecases/NetworkOfTwentyBrokersTest.java | 197 ++++++++++++++++++ 8 files changed, 324 insertions(+), 93 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/usecases/NetworkOfTwentyBrokersTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index c1a949fd07..fe18b6c639 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -1015,9 +1015,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor { ignore.printStackTrace(); } } - if (brokerInfo != null) { - broker.removeBroker(this, brokerInfo); - } } LOG.debug("Connection Stopped: " + getRemoteAddress()); } @@ -1182,7 +1179,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { // so this TransportConnection is the rear end of a network bridge // We have been requested to create a two way pipe ... try { - // We first look if existing network connection already exists for the same broker Id + // We first look if existing network connection already exists for the same broker Id and network connector name // It's possible in case of brief network fault to have this transport connector side of the connection always active // and the duplex network connector side wanting to open a new one // In this case, the old connection must be broken @@ -1234,7 +1231,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor { LOG.warn("Unexpected extra broker info command received: " + info); } this.brokerInfo = info; - broker.addBroker(this, info); networkConnection = true; List connectionStates = listConnectionStates(); for (TransportConnectionState cs : connectionStates) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index bcd2dc45bd..4bd14bc70d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -94,7 +94,7 @@ public class RegionBroker extends EmptyBroker { private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); private final Map destinations = new ConcurrentHashMap(); - private final CopyOnWriteArrayList brokerInfos = new CopyOnWriteArrayList(); + private final Map brokerInfos = new HashMap(); private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); private BrokerId brokerId; @@ -640,14 +640,25 @@ public class RegionBroker extends EmptyBroker { @Override public synchronized void addBroker(Connection connection, BrokerInfo info) { - brokerInfos.add(info); + BrokerInfo existing = brokerInfos.get(info.getBrokerId()); + if (existing == null) { + existing = info.copy(); + existing.setPeerBrokerInfos(null); + brokerInfos.put(info.getBrokerId(), existing); + } + existing.incrementRefCount(); + LOG.debug(getBrokerName() + " addBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size()); addBrokerInClusterUpdate(); } @Override public synchronized void removeBroker(Connection connection, BrokerInfo info) { if (info != null) { - brokerInfos.remove(info); + BrokerInfo existing = brokerInfos.get(info.getBrokerId()); + if (existing != null && existing.decrementRefCount() == 0) { + brokerInfos.remove(info.getBrokerId()); + } + LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size()); removeBrokerInClusterUpdate(); } } @@ -655,7 +666,7 @@ public class RegionBroker extends EmptyBroker { @Override public synchronized BrokerInfo[] getPeerBrokerInfos() { BrokerInfo[] result = new BrokerInfo[brokerInfos.size()]; - result = brokerInfos.toArray(result); + result = brokerInfos.values().toArray(result); return result; } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java index 9bd30b9dd6..6ae85de08f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java @@ -45,6 +45,7 @@ public class BrokerInfo extends BaseCommand { long connectionId; String brokerUploadUrl; String networkProperties; + transient int refCount = 0; public BrokerInfo copy() { BrokerInfo copy = new BrokerInfo(); @@ -265,4 +266,15 @@ public class BrokerInfo extends BaseCommand { } return result; } + + public int getRefCount() { + return refCount; + } + + public void incrementRefCount() { + refCount++; + } + public int decrementRefCount() { + return --refCount; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index fc4febff18..0f02d3b70c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -228,20 +228,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br localBroker.start(); remoteBroker.start(); - if (configuration.isDuplex() && duplexInitiatingConnection == null) { - // initiator side of duplex network - remoteBrokerNameKnownLatch.await(); - } if (!disposed.get()) { try { triggerRemoteStartBridge(); } catch (IOException e) { LOG.warn("Caught exception from remote start", e); } - NetworkBridgeListener l = this.networkBridgeListener; - if (l != null) { - l.onStart(this); - } } else { LOG.warn ("Bridge was disposed before the start() method was fully executed."); throw new TransportDisposedIOException(); @@ -309,6 +301,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br localSessionInfo = new SessionInfo(localConnectionInfo, 1); localBroker.oneway(localSessionInfo); brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex); + NetworkBridgeListener l = this.networkBridgeListener; + if (l != null) { + l.onStart(this); + } LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established."); } else { @@ -419,6 +415,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br ss.throwFirstException(); } } + brokerService.getBroker().removeBroker(null, remoteBrokerInfo); brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped"); remoteBrokerNameKnownLatch.countDown(); @@ -480,6 +477,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br serviceRemoteBrokerInfo(command); // Let the local broker know the remote broker's ID. localBroker.oneway(command); + // new peer broker (a consumer can work with remote broker also) + brokerService.getBroker().addBroker(null, remoteBrokerInfo); } else if (command.getClass() == ConnectionError.class) { ConnectionError ce = (ConnectionError) command; serviceRemoteException(ce.getException()); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 92cd1d037f..5be05330f1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -120,6 +120,7 @@ public class FailoverTransport implements CompositeTransport { private SslContext brokerSslContext; private String updateURIsURL = null; private boolean rebalanceUpdateURIs=true; + private boolean doRebalance = false; public FailoverTransport() throws InterruptedIOException { brokerSslContext = SslContext.getCurrentSslContext(); @@ -131,7 +132,7 @@ public class FailoverTransport implements CompositeTransport { boolean buildBackup = true; boolean doReconnect = !disposed; synchronized (backupMutex) { - if (connectedTransport.get() == null && !disposed) { + if ((connectedTransport.get() == null || doRebalance) && !disposed) { result = doReconnect(); buildBackup = false; } @@ -623,7 +624,7 @@ public class FailoverTransport implements CompositeTransport { for (int i = 0; i < u.length; i++) { uris.remove(u[i]); } - reconnect(rebalance); + // rebalance is automatic if any connected to removed/stopped broker } public void add(boolean rebalance, String u) { @@ -643,15 +644,7 @@ public class FailoverTransport implements CompositeTransport { synchronized (reconnectMutex) { if (started) { if (rebalance) { - Transport transport = this.connectedTransport.getAndSet(null); - if (transport != null) { - try { - transport.stop(); - } catch (Exception e) { - LOG.debug("Caught an exception stopping existing transport", e); - } - } - + doRebalance = true; } LOG.debug("Waking up reconnect task"); try { @@ -683,7 +676,7 @@ public class FailoverTransport implements CompositeTransport { if (removed) { l.add(failedConnectTransportURI); } - LOG.debug("urlList connectionList:" + l); + LOG.debug("urlList connectionList:" + l + ", from: " + uris); return l; } @@ -798,13 +791,31 @@ public class FailoverTransport implements CompositeTransport { reconnectMutex.notifyAll(); } - if (connectedTransport.get() != null || disposed || connectionFailure != null) { + if ((connectedTransport.get() != null && !doRebalance) || disposed || connectionFailure != null) { return false; } else { List connectList = getConnectList(); if (connectList.isEmpty()) { failure = new IOException("No uris available to connect to."); } else { + if (doRebalance) { + if (connectList.get(0).equals(connectedTransportURI)) { + // already connected to first in the list, no need to rebalance + doRebalance = false; + return false; + } else { + LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList); + try { + Transport transport = this.connectedTransport.getAndSet(null); + if (transport != null) { + transport.stop(); + } + } catch (Exception e) { + LOG.debug("Caught an exception stopping existing transport for rebalance", e); + } + } + doRebalance = false; + } if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) { reconnectDelay = initialReconnectDelay; } diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java index cdcd0c8054..ba0caf1b50 100644 --- a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java @@ -152,19 +152,23 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { maxSetupTime = 8000; } - - protected void waitForBridgeFormation() throws Exception { + + protected void waitForBridgeFormation(final int min) throws Exception { for (BrokerItem brokerItem : brokers.values()) { final BrokerService broker = brokerItem.broker; if (!broker.getNetworkConnectors().isEmpty()) { Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { - return !broker.getNetworkConnectors().get(0).activeBridges().isEmpty(); + return (broker.getNetworkConnectors().get(0).activeBridges().size() >= min); }}); } } } + protected void waitForBridgeFormation() throws Exception { + waitForBridgeFormation(1); + } + protected void startAllBrokers() throws Exception { Collection brokerList = brokers.values(); for (Iterator i = brokerList.iterator(); i.hasNext();) { @@ -517,6 +521,7 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { } broker.stop(); + broker.waitUntilStopped(); consumers.clear(); broker = null; diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java index a866d7a70a..80de199f07 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java @@ -35,31 +35,32 @@ import org.apache.activemq.network.NetworkConnector; public class FailoverClusterTest extends TestCase { -private static final int NUMBER = 10; -private static final String BROKER_A_BIND_ADDRESS = "tcp://0.0.0.0:61616"; -private static final String BROKER_B_BIND_ADDRESS = "tcp://0.0.0.0:61617"; -private static final String CLIENT_URL = "failover://("+BROKER_A_BIND_ADDRESS+")"; -private static final String BROKER_A_NAME = "BROKERA"; -private static final String BROKER_B_NAME = "BROKERB"; -private BrokerService brokerA; -private BrokerService brokerB; -private final Listconnections = new ArrayList(); + private static final int NUMBER = 10; + private static final String BROKER_A_BIND_ADDRESS = "tcp://0.0.0.0:61616"; + private static final String BROKER_B_BIND_ADDRESS = "tcp://0.0.0.0:61617"; + private static final String BROKER_A_NAME = "BROKERA"; + private static final String BROKER_B_NAME = "BROKERB"; + private BrokerService brokerA; + private BrokerService brokerB; + private String clientUrl; + + private final List connections = new ArrayList(); - public void testClusterConnectedAfterClients() throws Exception{ - createClients(); - if (brokerB == null) { - brokerB = createBrokerB(BROKER_B_BIND_ADDRESS); - } - Thread.sleep(3000); - Set set = new HashSet(); - for (ActiveMQConnection c:connections) { - set.add(c.getTransportChannel().getRemoteAddress()); - } - assertTrue(set.size() > 1); - } + public void testClusterConnectedAfterClients() throws Exception { + createClients(); + if (brokerB == null) { + brokerB = createBrokerB(BROKER_B_BIND_ADDRESS); + } + Thread.sleep(3000); + Set set = new HashSet(); + for (ActiveMQConnection c : connections) { + set.add(c.getTransportChannel().getRemoteAddress()); + } + assertTrue(set.size() > 1); + } - public void testClusterURIOptionsStrip() throws Exception{ + public void testClusterURIOptionsStrip() throws Exception { createClients(); if (brokerB == null) { // add in server side only url param, should not be propagated @@ -67,45 +68,44 @@ private final Listconnections = new ArrayList set = new HashSet(); - for (ActiveMQConnection c:connections) { + for (ActiveMQConnection c : connections) { set.add(c.getTransportChannel().getRemoteAddress()); } assertTrue(set.size() > 1); } - - public void testClusterConnectedBeforeClients() throws Exception{ - - if (brokerB == null) { - brokerB = createBrokerB(BROKER_B_BIND_ADDRESS); - } - Thread.sleep(5000); - createClients(); - Thread.sleep(2000); - brokerA.stop(); - Thread.sleep(2000); - - URI brokerBURI = new URI(BROKER_B_BIND_ADDRESS); - for (ActiveMQConnection c:connections) { - String addr = c.getTransportChannel().getRemoteAddress(); - assertTrue(addr.indexOf(""+brokerBURI.getPort()) > 0); - } - } + + public void testClusterConnectedBeforeClients() throws Exception { + + if (brokerB == null) { + brokerB = createBrokerB(BROKER_B_BIND_ADDRESS); + } + Thread.sleep(5000); + createClients(); + Thread.sleep(2000); + brokerA.stop(); + Thread.sleep(2000); + + URI brokerBURI = new URI(BROKER_B_BIND_ADDRESS); + for (ActiveMQConnection c : connections) { + String addr = c.getTransportChannel().getRemoteAddress(); + assertTrue(addr.indexOf("" + brokerBURI.getPort()) > 0); + } + } @Override protected void setUp() throws Exception { if (brokerA == null) { - brokerA = createBrokerA(BROKER_A_BIND_ADDRESS + "?transport.closeAsync=false"); + brokerA = createBrokerA(BROKER_A_BIND_ADDRESS + "?transport.closeAsync=false"); + clientUrl = "failover://(" + brokerA.getTransportConnectors().get(0).getPublishableConnectString() + ")"; } - - } @Override protected void tearDown() throws Exception { - for (Connection c:connections) { + for (Connection c : connections) { c.close(); - } + } if (brokerB != null) { brokerB.stop(); brokerB = null; @@ -115,16 +115,16 @@ private final Listconnections = new ArrayListconnections = new ArrayList brokerList = brokers.values(); + for (Iterator i = brokerList.iterator(); i.hasNext();) { + BrokerService broker = i.next().broker; + List transportConnectors = broker.getTransportConnectors(); + + if (transportConnectors.isEmpty()) { + broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT)); + transportConnectors = broker.getTransportConnectors(); + } + + TransportConnector transport = transportConnectors.get(0); + if (transport.getDiscoveryUri() == null) { + transport.setDiscoveryUri(new URI("multicast://default?group=" + groupName)); + } + + List networkConnectors = broker.getNetworkConnectors(); + if (networkConnectors.isEmpty()) { + broker.addNetworkConnector("multicast://default?group=" + groupName); + networkConnectors = broker.getNetworkConnectors(); + } + + NetworkConnector nc = networkConnectors.get(0); + nc.setNetworkTTL(ttl); + nc.setSuppressDuplicateQueueSubscriptions(suppressduplicateQueueSubs); + nc.setDecreaseNetworkConsumerPriority(decreasePriority); + } + + // Multicasting may take longer to setup + maxSetupTime = 8000; + } + + protected BrokerService createBroker(String brokerName) throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.setBrokerName(brokerName); + broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT)); + brokers.put(brokerName, new BrokerItem(broker)); + + return broker; + } + + /* AMQ-3077 Bug */ + public void testBrokers() throws Exception { + int X = 20; + int i; + + LOG.info("Creating X Brokers"); + for (i = 0; i < X; i++) { + createBroker("Broker" + i); + } + + bridgeAllBrokers(); + startAllBrokers(); + waitForBridgeFormation(X-1); + + verifyPeerBrokerInfos(X-1); + + LOG.info("Stopping half the brokers"); + for (i = 0; i < X/2; i++) { + destroyBroker("Broker" + i); + } + + LOG.info("Waiting for complete stop"); + try { + Thread.sleep(10000); + } catch (Exception e) { + } + + verifyPeerBrokerInfos((X/2) -1); + + LOG.info("Recreating first half"); + for (i = 0; i < X/2; i++) { + createBroker("Broker" + i); + } + + bridgeAllBrokers(); + startAllBrokers(); + waitForBridgeFormation(X-1); + + verifyPeerBrokerInfos(X-1); + } + + public void testPeerBrokerCountHalfPeer() throws Exception { + createBroker("A"); + createBroker("B"); + bridgeBrokers("A", "B"); + startAllBrokers(); + verifyPeerBrokerInfo(brokers.get("A"), 1); + verifyPeerBrokerInfo(brokers.get("B"), 0); + } + + public void testPeerBrokerCountHalfPeerTwice() throws Exception { + createBroker("A"); + createBroker("B"); + bridgeBrokers("A", "B"); + bridgeBrokers("A", "B"); + startAllBrokers(); + verifyPeerBrokerInfo(brokers.get("A"), 1); + verifyPeerBrokerInfo(brokers.get("B"), 0); + } + + public void testPeerBrokerCountFullPeer() throws Exception { + createBroker("A"); + createBroker("B"); + bridgeBrokers("A", "B"); + bridgeBrokers("B", "A"); + startAllBrokers(); + verifyPeerBrokerInfo(brokers.get("A"), 1); + verifyPeerBrokerInfo(brokers.get("B"), 1); + } + + public void testPeerBrokerCountFullPeerDuplex() throws Exception { + createBroker("A"); + createBroker("B"); + NetworkConnector nc = bridgeBrokers("A", "B"); + nc.setDuplex(true); + startAllBrokers(); + verifyPeerBrokerInfo(brokers.get("A"), 1); + verifyPeerBrokerInfo(brokers.get("B"), 1); + } + + + private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) { + BrokerService broker = brokerItem.broker; + RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); + LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length); + for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) { + LOG.info(info.getBrokerName()); + } + assertEquals(broker.getBrokerName(), max, regionBroker.getPeerBrokerInfos().length); + } + + private void verifyPeerBrokerInfos(final int max) { + Collection brokerList = brokers.values(); + for (Iterator i = brokerList.iterator(); i.hasNext();) { + verifyPeerBrokerInfo(i.next(), max); + } + } + + @Override + public void setUp() throws Exception { + super.setAutoFail(true); + super.setUp(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadTracker.result(); + } +}