https://issues.apache.org/jira/browse/AMQ-3706 - use round-robin instead of rnd for cluster rebalancing

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1311237 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2012-04-09 14:07:03 +00:00
parent 392d017c6e
commit 2280719835
3 changed files with 69 additions and 65 deletions

View File

@ -712,6 +712,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
// send ConnectionCommand
ConnectionControl command = this.connector.getConnectionControl();
command.setFaultTolerant(broker.isFaultTolerantConfiguration());
if (info.isFailoverReconnect()) {
command.setRebalanceConnection(false);
}
dispatchAsync(command);
}
return null;

View File

@ -16,19 +16,6 @@
*/
package org.apache.activemq.broker;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Random;
import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.regex.Pattern;
import javax.management.ObjectName;
import org.apache.activemq.broker.jmx.ManagedTransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.region.ConnectorStatistics;
@ -48,6 +35,16 @@ import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.regex.Pattern;
/**
* @org.apache.xbean.XBean
*
@ -78,7 +75,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
private String updateClusterFilter;
private boolean auditNetworkProducers = false;
Random rnd = new Random(System.currentTimeMillis());
LinkedList<String> peerBrokers = new LinkedList<String>();
public TransportConnector() {
}
@ -410,23 +407,17 @@ public class TransportConnector implements Connector, BrokerServiceAware {
String separator = "";
if (isUpdateClusterClients()) {
ArrayList<String> uris = new ArrayList<String>();
uris.add(brokerService.getDefaultSocketURIString());
for (BrokerInfo info: broker.getPeerBrokerInfos()) {
if (isMatchesClusterFilter(info.getBrokerName())) {
if (info.getBrokerURL() != null) {
uris.add(info.getBrokerURL());
}
synchronized (peerBrokers) {
for (String uri : getPeerBrokers()) {
connectedBrokers += separator + uri;
separator = ",";
}
if (rebalance) {
String shuffle = getPeerBrokers().removeFirst();
getPeerBrokers().addLast(shuffle);
}
}
if (rebalance) {
Collections.shuffle(uris, rnd);
}
for (String uri: uris) {
connectedBrokers += separator + uri;
separator = ",";
}
}
ConnectionControl control = new ConnectionControl();
control.setConnectedBrokers(connectedBrokers);
@ -434,8 +425,32 @@ public class TransportConnector implements Connector, BrokerServiceAware {
return control;
}
public void addPeerBroker(BrokerInfo info) {
if (isMatchesClusterFilter(info.getBrokerName())) {
synchronized (peerBrokers) {
getPeerBrokers().addLast(info.getBrokerURL());
}
}
}
public void removePeerBroker(BrokerInfo info) {
synchronized (peerBrokers) {
getPeerBrokers().remove(info.getBrokerURL());
}
}
public LinkedList<String> getPeerBrokers() {
synchronized (peerBrokers) {
if (peerBrokers.isEmpty()) {
peerBrokers.add(brokerService.getDefaultSocketURIString());
}
return peerBrokers;
}
}
public void updateClientClusterInfo() {
if (isRebalanceClusterClients() || isUpdateClusterClients()) {
ConnectionControl control = getConnectionControl();
for (Connection c : this.connections) {

View File

@ -16,20 +16,6 @@
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
@ -37,27 +23,10 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.EmptyBroker;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.*;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
@ -71,6 +40,21 @@ import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Routes Broker operations to the correct messaging regions for processing.
*
@ -649,7 +633,7 @@ public class RegionBroker extends EmptyBroker {
if (LOG.isDebugEnabled()) {
LOG.debug(getBrokerName() + " addBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
}
addBrokerInClusterUpdate();
addBrokerInClusterUpdate(info);
}
@Override
@ -662,7 +646,7 @@ public class RegionBroker extends EmptyBroker {
if (LOG.isDebugEnabled()) {
LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
}
removeBrokerInClusterUpdate();
removeBrokerInClusterUpdate(info);
}
}
@ -913,19 +897,21 @@ public class RegionBroker extends EmptyBroker {
}
}
protected void addBrokerInClusterUpdate() {
protected void addBrokerInClusterUpdate(BrokerInfo info) {
List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
for (TransportConnector connector : connectors) {
if (connector.isUpdateClusterClients()) {
connector.addPeerBroker(info);
connector.updateClientClusterInfo();
}
}
}
protected void removeBrokerInClusterUpdate() {
protected void removeBrokerInClusterUpdate(BrokerInfo info) {
List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
for (TransportConnector connector : connectors) {
if (connector.isUpdateClusterClients() && connector.isUpdateClusterClientsOnRemove()) {
connector.removePeerBroker(info);
connector.updateClientClusterInfo();
}
}