https://issues.apache.org/jira/browse/AMQ-3077 - ArraysIndexOutOfBoundsException : -32768 in "BrokerService[xxx] Task" thread - brokerInfo and peerBroker infro explosion problems. A peer is a oneway relationship with networks, broker infos were being accumulated in duplicate for each connector and for multiple connectors. The peer broker info was maintained for each which caused the problem marshalling. re: https://issues.apache.org/jira/browse/AMQ-2632 - the configuration is now respected so it can be selectively enabled and rebalance only occurs if we randomly choose an alternative. The nested peer broker info is not propagated in a connection control

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1058577 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-01-13 14:18:14 +00:00
parent 7b1cdbc080
commit c752230573
8 changed files with 324 additions and 93 deletions

View File

@ -1015,9 +1015,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
ignore.printStackTrace();
}
}
if (brokerInfo != null) {
broker.removeBroker(this, brokerInfo);
}
}
LOG.debug("Connection Stopped: " + getRemoteAddress());
}
@ -1182,7 +1179,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
// so this TransportConnection is the rear end of a network bridge
// We have been requested to create a two way pipe ...
try {
// We first look if existing network connection already exists for the same broker Id
// We first look if existing network connection already exists for the same broker Id and network connector name
// It's possible in case of brief network fault to have this transport connector side of the connection always active
// and the duplex network connector side wanting to open a new one
// In this case, the old connection must be broken
@ -1234,7 +1231,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
LOG.warn("Unexpected extra broker info command received: " + info);
}
this.brokerInfo = info;
broker.addBroker(this, info);
networkConnection = true;
List<TransportConnectionState> connectionStates = listConnectionStates();
for (TransportConnectionState cs : connectionStates) {

View File

@ -94,7 +94,7 @@ public class RegionBroker extends EmptyBroker {
private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
private final CopyOnWriteArrayList<BrokerInfo> brokerInfos = new CopyOnWriteArrayList<BrokerInfo>();
private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap<BrokerId, BrokerInfo>();
private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
private BrokerId brokerId;
@ -640,14 +640,25 @@ public class RegionBroker extends EmptyBroker {
@Override
public synchronized void addBroker(Connection connection, BrokerInfo info) {
brokerInfos.add(info);
BrokerInfo existing = brokerInfos.get(info.getBrokerId());
if (existing == null) {
existing = info.copy();
existing.setPeerBrokerInfos(null);
brokerInfos.put(info.getBrokerId(), existing);
}
existing.incrementRefCount();
LOG.debug(getBrokerName() + " addBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
addBrokerInClusterUpdate();
}
@Override
public synchronized void removeBroker(Connection connection, BrokerInfo info) {
if (info != null) {
brokerInfos.remove(info);
BrokerInfo existing = brokerInfos.get(info.getBrokerId());
if (existing != null && existing.decrementRefCount() == 0) {
brokerInfos.remove(info.getBrokerId());
}
LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
removeBrokerInClusterUpdate();
}
}
@ -655,7 +666,7 @@ public class RegionBroker extends EmptyBroker {
@Override
public synchronized BrokerInfo[] getPeerBrokerInfos() {
BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
result = brokerInfos.toArray(result);
result = brokerInfos.values().toArray(result);
return result;
}

View File

@ -45,6 +45,7 @@ public class BrokerInfo extends BaseCommand {
long connectionId;
String brokerUploadUrl;
String networkProperties;
transient int refCount = 0;
public BrokerInfo copy() {
BrokerInfo copy = new BrokerInfo();
@ -265,4 +266,15 @@ public class BrokerInfo extends BaseCommand {
}
return result;
}
public int getRefCount() {
return refCount;
}
public void incrementRefCount() {
refCount++;
}
public int decrementRefCount() {
return --refCount;
}
}

View File

@ -228,20 +228,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
localBroker.start();
remoteBroker.start();
if (configuration.isDuplex() && duplexInitiatingConnection == null) {
// initiator side of duplex network
remoteBrokerNameKnownLatch.await();
}
if (!disposed.get()) {
try {
triggerRemoteStartBridge();
} catch (IOException e) {
LOG.warn("Caught exception from remote start", e);
}
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null) {
l.onStart(this);
}
} else {
LOG.warn ("Bridge was disposed before the start() method was fully executed.");
throw new TransportDisposedIOException();
@ -309,6 +301,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
localSessionInfo = new SessionInfo(localConnectionInfo, 1);
localBroker.oneway(localSessionInfo);
brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex);
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null) {
l.onStart(this);
}
LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established.");
} else {
@ -419,6 +415,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
ss.throwFirstException();
}
}
brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
remoteBrokerNameKnownLatch.countDown();
@ -480,6 +477,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
serviceRemoteBrokerInfo(command);
// Let the local broker know the remote broker's ID.
localBroker.oneway(command);
// new peer broker (a consumer can work with remote broker also)
brokerService.getBroker().addBroker(null, remoteBrokerInfo);
} else if (command.getClass() == ConnectionError.class) {
ConnectionError ce = (ConnectionError) command;
serviceRemoteException(ce.getException());

View File

@ -120,6 +120,7 @@ public class FailoverTransport implements CompositeTransport {
private SslContext brokerSslContext;
private String updateURIsURL = null;
private boolean rebalanceUpdateURIs=true;
private boolean doRebalance = false;
public FailoverTransport() throws InterruptedIOException {
brokerSslContext = SslContext.getCurrentSslContext();
@ -131,7 +132,7 @@ public class FailoverTransport implements CompositeTransport {
boolean buildBackup = true;
boolean doReconnect = !disposed;
synchronized (backupMutex) {
if (connectedTransport.get() == null && !disposed) {
if ((connectedTransport.get() == null || doRebalance) && !disposed) {
result = doReconnect();
buildBackup = false;
}
@ -623,7 +624,7 @@ public class FailoverTransport implements CompositeTransport {
for (int i = 0; i < u.length; i++) {
uris.remove(u[i]);
}
reconnect(rebalance);
// rebalance is automatic if any connected to removed/stopped broker
}
public void add(boolean rebalance, String u) {
@ -643,15 +644,7 @@ public class FailoverTransport implements CompositeTransport {
synchronized (reconnectMutex) {
if (started) {
if (rebalance) {
Transport transport = this.connectedTransport.getAndSet(null);
if (transport != null) {
try {
transport.stop();
} catch (Exception e) {
LOG.debug("Caught an exception stopping existing transport", e);
}
}
doRebalance = true;
}
LOG.debug("Waking up reconnect task");
try {
@ -683,7 +676,7 @@ public class FailoverTransport implements CompositeTransport {
if (removed) {
l.add(failedConnectTransportURI);
}
LOG.debug("urlList connectionList:" + l);
LOG.debug("urlList connectionList:" + l + ", from: " + uris);
return l;
}
@ -798,13 +791,31 @@ public class FailoverTransport implements CompositeTransport {
reconnectMutex.notifyAll();
}
if (connectedTransport.get() != null || disposed || connectionFailure != null) {
if ((connectedTransport.get() != null && !doRebalance) || disposed || connectionFailure != null) {
return false;
} else {
List<URI> connectList = getConnectList();
if (connectList.isEmpty()) {
failure = new IOException("No uris available to connect to.");
} else {
if (doRebalance) {
if (connectList.get(0).equals(connectedTransportURI)) {
// already connected to first in the list, no need to rebalance
doRebalance = false;
return false;
} else {
LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
try {
Transport transport = this.connectedTransport.getAndSet(null);
if (transport != null) {
transport.stop();
}
} catch (Exception e) {
LOG.debug("Caught an exception stopping existing transport for rebalance", e);
}
}
doRebalance = false;
}
if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
reconnectDelay = initialReconnectDelay;
}

View File

@ -152,19 +152,23 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
maxSetupTime = 8000;
}
protected void waitForBridgeFormation() throws Exception {
protected void waitForBridgeFormation(final int min) throws Exception {
for (BrokerItem brokerItem : brokers.values()) {
final BrokerService broker = brokerItem.broker;
if (!broker.getNetworkConnectors().isEmpty()) {
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return !broker.getNetworkConnectors().get(0).activeBridges().isEmpty();
return (broker.getNetworkConnectors().get(0).activeBridges().size() >= min);
}});
}
}
}
protected void waitForBridgeFormation() throws Exception {
waitForBridgeFormation(1);
}
protected void startAllBrokers() throws Exception {
Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
@ -517,6 +521,7 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
}
broker.stop();
broker.waitUntilStopped();
consumers.clear();
broker = null;

View File

@ -35,31 +35,32 @@ import org.apache.activemq.network.NetworkConnector;
public class FailoverClusterTest extends TestCase {
private static final int NUMBER = 10;
private static final String BROKER_A_BIND_ADDRESS = "tcp://0.0.0.0:61616";
private static final String BROKER_B_BIND_ADDRESS = "tcp://0.0.0.0:61617";
private static final String CLIENT_URL = "failover://("+BROKER_A_BIND_ADDRESS+")";
private static final String BROKER_A_NAME = "BROKERA";
private static final String BROKER_B_NAME = "BROKERB";
private BrokerService brokerA;
private BrokerService brokerB;
private final List<ActiveMQConnection>connections = new ArrayList<ActiveMQConnection>();
private static final int NUMBER = 10;
private static final String BROKER_A_BIND_ADDRESS = "tcp://0.0.0.0:61616";
private static final String BROKER_B_BIND_ADDRESS = "tcp://0.0.0.0:61617";
private static final String BROKER_A_NAME = "BROKERA";
private static final String BROKER_B_NAME = "BROKERB";
private BrokerService brokerA;
private BrokerService brokerB;
private String clientUrl;
private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>();
public void testClusterConnectedAfterClients() throws Exception{
createClients();
if (brokerB == null) {
brokerB = createBrokerB(BROKER_B_BIND_ADDRESS);
}
Thread.sleep(3000);
Set<String> set = new HashSet<String>();
for (ActiveMQConnection c:connections) {
set.add(c.getTransportChannel().getRemoteAddress());
}
assertTrue(set.size() > 1);
}
public void testClusterConnectedAfterClients() throws Exception {
createClients();
if (brokerB == null) {
brokerB = createBrokerB(BROKER_B_BIND_ADDRESS);
}
Thread.sleep(3000);
Set<String> set = new HashSet<String>();
for (ActiveMQConnection c : connections) {
set.add(c.getTransportChannel().getRemoteAddress());
}
assertTrue(set.size() > 1);
}
public void testClusterURIOptionsStrip() throws Exception{
public void testClusterURIOptionsStrip() throws Exception {
createClients();
if (brokerB == null) {
// add in server side only url param, should not be propagated
@ -67,45 +68,44 @@ private final List<ActiveMQConnection>connections = new ArrayList<ActiveMQConnec
}
Thread.sleep(3000);
Set<String> set = new HashSet<String>();
for (ActiveMQConnection c:connections) {
for (ActiveMQConnection c : connections) {
set.add(c.getTransportChannel().getRemoteAddress());
}
assertTrue(set.size() > 1);
}
public void testClusterConnectedBeforeClients() throws Exception{
if (brokerB == null) {
brokerB = createBrokerB(BROKER_B_BIND_ADDRESS);
}
Thread.sleep(5000);
createClients();
Thread.sleep(2000);
brokerA.stop();
Thread.sleep(2000);
URI brokerBURI = new URI(BROKER_B_BIND_ADDRESS);
for (ActiveMQConnection c:connections) {
String addr = c.getTransportChannel().getRemoteAddress();
assertTrue(addr.indexOf(""+brokerBURI.getPort()) > 0);
}
}
public void testClusterConnectedBeforeClients() throws Exception {
if (brokerB == null) {
brokerB = createBrokerB(BROKER_B_BIND_ADDRESS);
}
Thread.sleep(5000);
createClients();
Thread.sleep(2000);
brokerA.stop();
Thread.sleep(2000);
URI brokerBURI = new URI(BROKER_B_BIND_ADDRESS);
for (ActiveMQConnection c : connections) {
String addr = c.getTransportChannel().getRemoteAddress();
assertTrue(addr.indexOf("" + brokerBURI.getPort()) > 0);
}
}
@Override
protected void setUp() throws Exception {
if (brokerA == null) {
brokerA = createBrokerA(BROKER_A_BIND_ADDRESS + "?transport.closeAsync=false");
brokerA = createBrokerA(BROKER_A_BIND_ADDRESS + "?transport.closeAsync=false");
clientUrl = "failover://(" + brokerA.getTransportConnectors().get(0).getPublishableConnectString() + ")";
}
}
@Override
protected void tearDown() throws Exception {
for (Connection c:connections) {
for (Connection c : connections) {
c.close();
}
}
if (brokerB != null) {
brokerB.stop();
brokerB = null;
@ -115,16 +115,16 @@ private final List<ActiveMQConnection>connections = new ArrayList<ActiveMQConnec
brokerA = null;
}
}
protected BrokerService createBrokerA(String uri) throws Exception {
BrokerService answer = new BrokerService();
answer.setUseJmx(false);
configureConsumerBroker(answer,uri);
configureConsumerBroker(answer, uri);
answer.start();
return answer;
}
protected void configureConsumerBroker(BrokerService answer,String uri) throws Exception {
protected void configureConsumerBroker(BrokerService answer, String uri) throws Exception {
answer.setBrokerName(BROKER_A_NAME);
answer.setPersistent(false);
TransportConnector connector = answer.addConnector(uri);
@ -132,33 +132,33 @@ private final List<ActiveMQConnection>connections = new ArrayList<ActiveMQConnec
connector.setUpdateClusterClients(true);
answer.setUseShutdownHook(false);
}
protected BrokerService createBrokerB(String uri) throws Exception {
BrokerService answer = new BrokerService();
answer.setUseJmx(false);
configureNetwork(answer,uri);
configureNetwork(answer, uri);
answer.start();
return answer;
}
protected void configureNetwork(BrokerService answer,String uri) throws Exception {
protected void configureNetwork(BrokerService answer, String uri) throws Exception {
answer.setBrokerName(BROKER_B_NAME);
answer.setPersistent(false);
NetworkConnector network = answer.addNetworkConnector("static://"+BROKER_A_BIND_ADDRESS);
NetworkConnector network = answer.addNetworkConnector("static://" + BROKER_A_BIND_ADDRESS);
network.setDuplex(true);
TransportConnector connector =answer.addConnector(uri);
TransportConnector connector = answer.addConnector(uri);
connector.setRebalanceClusterClients(true);
connector.setUpdateClusterClients(true);
answer.setUseShutdownHook(false);
}
protected void createClients() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(CLIENT_URL);
for (int i =0;i < NUMBER; i++) {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl);
for (int i = 0; i < NUMBER; i++) {
ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
c.start();
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName());
Queue queue = s.createQueue(getClass().getName());
MessageConsumer consumer = s.createConsumer(queue);
connections.add(c);
}

View File

@ -0,0 +1,197 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.ThreadTracker;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class NetworkOfTwentyBrokersTest extends JmsMultipleBrokersTestSupport {
private static final Log LOG = LogFactory.getLog(NetworkOfTwentyBrokersTest.class);
// This will interconnect all brokers using multicast
protected void bridgeAllBrokers() throws Exception {
bridgeAllBrokers("TwentyBrokersTest", 1, false, false);
}
protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs) throws Exception {
bridgeAllBrokers(groupName, ttl, suppressduplicateQueueSubs, false);
}
protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs, boolean decreasePriority) throws Exception {
Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
List<TransportConnector> transportConnectors = broker.getTransportConnectors();
if (transportConnectors.isEmpty()) {
broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT));
transportConnectors = broker.getTransportConnectors();
}
TransportConnector transport = transportConnectors.get(0);
if (transport.getDiscoveryUri() == null) {
transport.setDiscoveryUri(new URI("multicast://default?group=" + groupName));
}
List<NetworkConnector> networkConnectors = broker.getNetworkConnectors();
if (networkConnectors.isEmpty()) {
broker.addNetworkConnector("multicast://default?group=" + groupName);
networkConnectors = broker.getNetworkConnectors();
}
NetworkConnector nc = networkConnectors.get(0);
nc.setNetworkTTL(ttl);
nc.setSuppressDuplicateQueueSubscriptions(suppressduplicateQueueSubs);
nc.setDecreaseNetworkConsumerPriority(decreasePriority);
}
// Multicasting may take longer to setup
maxSetupTime = 8000;
}
protected BrokerService createBroker(String brokerName) throws Exception {
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
broker.setBrokerName(brokerName);
broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT));
brokers.put(brokerName, new BrokerItem(broker));
return broker;
}
/* AMQ-3077 Bug */
public void testBrokers() throws Exception {
int X = 20;
int i;
LOG.info("Creating X Brokers");
for (i = 0; i < X; i++) {
createBroker("Broker" + i);
}
bridgeAllBrokers();
startAllBrokers();
waitForBridgeFormation(X-1);
verifyPeerBrokerInfos(X-1);
LOG.info("Stopping half the brokers");
for (i = 0; i < X/2; i++) {
destroyBroker("Broker" + i);
}
LOG.info("Waiting for complete stop");
try {
Thread.sleep(10000);
} catch (Exception e) {
}
verifyPeerBrokerInfos((X/2) -1);
LOG.info("Recreating first half");
for (i = 0; i < X/2; i++) {
createBroker("Broker" + i);
}
bridgeAllBrokers();
startAllBrokers();
waitForBridgeFormation(X-1);
verifyPeerBrokerInfos(X-1);
}
public void testPeerBrokerCountHalfPeer() throws Exception {
createBroker("A");
createBroker("B");
bridgeBrokers("A", "B");
startAllBrokers();
verifyPeerBrokerInfo(brokers.get("A"), 1);
verifyPeerBrokerInfo(brokers.get("B"), 0);
}
public void testPeerBrokerCountHalfPeerTwice() throws Exception {
createBroker("A");
createBroker("B");
bridgeBrokers("A", "B");
bridgeBrokers("A", "B");
startAllBrokers();
verifyPeerBrokerInfo(brokers.get("A"), 1);
verifyPeerBrokerInfo(brokers.get("B"), 0);
}
public void testPeerBrokerCountFullPeer() throws Exception {
createBroker("A");
createBroker("B");
bridgeBrokers("A", "B");
bridgeBrokers("B", "A");
startAllBrokers();
verifyPeerBrokerInfo(brokers.get("A"), 1);
verifyPeerBrokerInfo(brokers.get("B"), 1);
}
public void testPeerBrokerCountFullPeerDuplex() throws Exception {
createBroker("A");
createBroker("B");
NetworkConnector nc = bridgeBrokers("A", "B");
nc.setDuplex(true);
startAllBrokers();
verifyPeerBrokerInfo(brokers.get("A"), 1);
verifyPeerBrokerInfo(brokers.get("B"), 1);
}
private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) {
BrokerService broker = brokerItem.broker;
RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) {
LOG.info(info.getBrokerName());
}
assertEquals(broker.getBrokerName(), max, regionBroker.getPeerBrokerInfos().length);
}
private void verifyPeerBrokerInfos(final int max) {
Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
verifyPeerBrokerInfo(i.next(), max);
}
}
@Override
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
}
@Override
public void tearDown() throws Exception {
super.tearDown();
ThreadTracker.result();
}
}