git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@920306 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-03-08 12:48:45 +00:00
parent cc18e703b2
commit f392884e31
41 changed files with 1014 additions and 294 deletions

View File

@ -215,6 +215,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.info = new ConnectionInfo(new ConnectionId(CONNECTION_ID_GENERATOR.generateId()));
this.info.setManageable(true);
this.info.setFaultTolerant(transport.isFaultTolerant());
this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
this.transport.setTransportListener(this);

View File

@ -440,10 +440,9 @@ public class AdvisoryBroker extends BrokerFilter {
String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
String[] uris = getBrokerService().getTransportConnectorURIs();
String url = getBrokerService().getVmConnectorURI().toString();
if (uris != null && uris.length > 0) {
url = uris[0];
if (getBrokerService().getDefaultSocketURI() != null) {
url = getBrokerService().getDefaultSocketURI().toString();
}
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);

View File

@ -31,7 +31,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.management.MalformedObjectNameException;
@ -154,6 +153,7 @@ public class BrokerService implements Service {
private boolean deleteAllMessagesOnStartup;
private boolean advisorySupport = true;
private URI vmConnectorURI;
private URI defaultSocketURI;
private PolicyMap destinationPolicy;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean stopped = new AtomicBoolean(false);
@ -1272,6 +1272,28 @@ public class BrokerService implements Service {
this.vmConnectorURI = vmConnectorURI;
}
public URI getDefaultSocketURI() {
if (started.get()) {
if (this.defaultSocketURI==null) {
for (TransportConnector tc:this.transportConnectors) {
URI result = null;
try {
result = tc.getConnectUri();
} catch (Exception e) {
LOG.warn("Failed to get the ConnectURI for "+tc,e);
}
if (result != null) {
this.defaultSocketURI=result;
break;
}
}
}
return this.defaultSocketURI;
}
return null;
}
/**
* @return Returns the shutdownOnMasterFailure.
*/
@ -2007,6 +2029,9 @@ public class BrokerService implements Service {
connector.setLocalUri(uri);
connector.setBrokerName(getBrokerName());
connector.setDurableDestinations(durableDestinations);
if (getDefaultSocketURI() != null) {
connector.setBrokerURL(getDefaultSocketURI().toString());
}
connector.start();
}
for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {

View File

@ -17,10 +17,10 @@
package org.apache.activemq.broker;
import java.io.IOException;
import org.apache.activemq.Service;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.Response;
/**
@ -51,6 +51,7 @@ public interface Connection extends Service {
* Services a client command and submits it to the broker.
*
* @param command
* @return Response
*/
Response service(Command command);
@ -111,4 +112,11 @@ public interface Connection extends Service {
*/
boolean isNetworkConnection();
/**
* @return true if a fault tolerant connection
*/
boolean isFaultTolerantConnection();
void updateClient(ConnectionControl control);
}

View File

@ -28,8 +28,7 @@ import org.apache.activemq.command.BrokerInfo;
public interface Connector extends Service {
/**
*
* @return
* @return brokerInfo
*/
BrokerInfo getBrokerInfo();
@ -37,4 +36,21 @@ public interface Connector extends Service {
* @return the statistics for this connector
*/
ConnectorStatistics getStatistics();
/**
* @return true if update client connections when brokers leave/join a cluster
*/
public boolean isUpdateClusterClients();
/**
* @return true if clients should be re-balanced across the cluster
*/
public boolean isRebalanceClusterClients();
/**
* Update all the connections with information
* about the connected brokers in the cluster
*/
public void updateClientClusterInfo();
}

View File

@ -121,7 +121,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
// Used to do async dispatch.. this should perhaps be pushed down into the
// transport layer..
private boolean inServiceException;
private ConnectionStatistics statistics = new ConnectionStatistics();
private final ConnectionStatistics statistics = new ConnectionStatistics();
private boolean manageable;
private boolean slow;
private boolean markedCandidate;
@ -133,15 +133,15 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
private boolean pendingStop;
private long timeStamp;
private final AtomicBoolean stopping = new AtomicBoolean(false);
private CountDownLatch stopped = new CountDownLatch(1);
private final CountDownLatch stopped = new CountDownLatch(1);
private final AtomicBoolean asyncException = new AtomicBoolean(false);
private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
private ConnectionContext context;
private boolean networkConnection;
private boolean faultTolerantConnection;
private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
private DemandForwardingBridge duplexBridge;
private final TaskRunnerFactory taskRunnerFactory;
private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
@ -168,6 +168,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
this.taskRunnerFactory = taskRunnerFactory;
this.transport = transport;
this.transport.setTransportListener(new DefaultTransportListener() {
@Override
public void onCommand(Object o) {
serviceLock.readLock().lock();
try {
@ -184,6 +185,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
}
@Override
public void onException(IOException exception) {
serviceLock.readLock().lock();
try {
@ -241,6 +243,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
public void serviceExceptionAsync(final IOException e) {
if (asyncException.compareAndSet(false, true)) {
new Thread("Async Exception Handler") {
@Override
public void run() {
serviceException(e);
}
@ -654,6 +657,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
registerConnectionState(info.getConnectionId(), state);
LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress());
this.faultTolerantConnection=info.isFaultTolerant();
// Setup the context.
String clientId = info.getClientId();
context = new ConnectionContext();
@ -672,6 +676,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
this.manageable = info.isManageable();
state.setContext(context);
state.setConnection(this);
try {
broker.addConnection(context, info);
} catch (Exception e) {
@ -679,9 +684,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
LOG.warn("Failed to add Connection", e);
throw e;
}
if (info.isManageable() && broker.isFaultTolerantConfiguration()) {
if (info.isManageable()) {
// send ConnectionCommand
ConnectionControl command = new ConnectionControl();
ConnectionControl command = this.connector.getConnectionControl();
command.setFaultTolerant(broker.isFaultTolerantConfiguration());
dispatchAsync(command);
}
@ -867,7 +872,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
transport.start();
active = true;
dispatchAsync(connector.getBrokerInfo());
BrokerInfo info = connector.getBrokerInfo().copy();
info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
dispatchAsync(info);
connector.onStarted(this);
}
} catch (Exception e) {
@ -1121,6 +1129,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return networkConnection;
}
public boolean isFaultTolerantConnection() {
return this.faultTolerantConnection;
}
protected synchronized void setStarting(boolean starting) {
this.starting = starting;
}
@ -1223,6 +1235,13 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
public void updateClient(ConnectionControl control) {
if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
&& this.wireFormatInfo.getVersion() >= 6) {
dispatchAsync(control);
}
}
private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) {
ProducerBrokerExchange result = producerExchanges.get(id);
if (result == null) {

View File

@ -16,10 +16,17 @@
*/
package org.apache.activemq.broker;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.management.ObjectName;
import org.apache.activemq.broker.jmx.ManagedTransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.region.ConnectorStatistics;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -34,6 +41,7 @@ import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import static org.apache.activemq.thread.DefaultThreadPools.*;
import java.io.IOException;
@ -53,7 +61,6 @@ public class TransportConnector implements Connector, BrokerServiceAware {
protected CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
protected TransportStatusDetector statusDector;
private BrokerService brokerService;
private TransportServer server;
private URI uri;
@ -61,13 +68,15 @@ public class TransportConnector implements Connector, BrokerServiceAware {
private TaskRunnerFactory taskRunnerFactory;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
private DiscoveryAgent discoveryAgent;
private ConnectorStatistics statistics = new ConnectorStatistics();
private final ConnectorStatistics statistics = new ConnectorStatistics();
private URI discoveryUri;
private URI connectUri;
private String name;
private boolean disableAsyncDispatch;
private boolean enableStatusMonitor = false;
private Broker broker;
private boolean updateClusterClients=false;
private boolean rebalanceClusterClients;
public TransportConnector() {
}
@ -109,6 +118,8 @@ public class TransportConnector implements Connector, BrokerServiceAware {
rc.setTaskRunnerFactory(getTaskRunnerFactory());
rc.setUri(getUri());
rc.setBrokerService(brokerService);
rc.setUpdateClusterClients(isUpdateClusterClients());
rc.setRebalanceClusterClients(isRebalanceClusterClients());
return rc;
}
@ -193,16 +204,13 @@ public class TransportConnector implements Connector, BrokerServiceAware {
}
public void start() throws Exception {
TransportServer server = getServer();
broker = brokerService.getBroker();
brokerInfo.setBrokerName(broker.getBrokerName());
brokerInfo.setBrokerId(broker.getBrokerId());
brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
brokerInfo.setBrokerURL(server.getConnectURI().toString());
server.setAcceptListener(new TransportAcceptListener() {
public void onAccept(final Transport transport) {
try {
@ -233,7 +241,6 @@ public class TransportConnector implements Connector, BrokerServiceAware {
LOG.debug("Reason: " + error, error);
}
});
server.setBrokerInfo(brokerInfo);
server.start();
@ -366,6 +373,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
this.name = name;
}
@Override
public String toString() {
String rc = getName();
if (rc == null) {
@ -374,6 +382,43 @@ public class TransportConnector implements Connector, BrokerServiceAware {
return rc;
}
protected ConnectionControl getConnectionControl() {
boolean rebalance = isRebalanceClusterClients();
String connectedBrokers = "";
String self = "";
if (brokerService.getDefaultSocketURI() != null) {
self += brokerService.getDefaultSocketURI().toString();
self += ",";
}
if (rebalance == false) {
connectedBrokers += self;
}
if (this.broker.getPeerBrokerInfos() != null) {
for (BrokerInfo info : this.broker.getPeerBrokerInfos()) {
connectedBrokers += info.getBrokerURL();
connectedBrokers += ",";
}
}
if (rebalance) {
connectedBrokers += self;
}
ConnectionControl control = new ConnectionControl();
control.setConnectedBrokers(connectedBrokers);
control.setRebalanceConnection(rebalance);
return control;
}
public void updateClientClusterInfo() {
if (isRebalanceClusterClients() || isUpdateClusterClients()) {
ConnectionControl control = getConnectionControl();
for (Connection c: this.connections) {
c.updateClient(control);
}
}
}
public boolean isDisableAsyncDispatch() {
return disableAsyncDispatch;
}
@ -410,4 +455,32 @@ public class TransportConnector implements Connector, BrokerServiceAware {
public BrokerService getBrokerService() {
return brokerService;
}
/**
* @return the updateClusterClients
*/
public boolean isUpdateClusterClients() {
return this.updateClusterClients;
}
/**
* @param updateClusterClients the updateClusterClients to set
*/
public void setUpdateClusterClients(boolean updateClusterClients) {
this.updateClusterClients = updateClusterClients;
}
/**
* @return the rebalanceClusterClients
*/
public boolean isRebalanceClusterClients() {
return this.rebalanceClusterClients;
}
/**
* @param rebalanceClusterClients the rebalanceClusterClients to set
*/
public void setRebalanceClusterClients(boolean rebalanceClusterClients) {
this.rebalanceClusterClients = rebalanceClusterClients;
}
}

View File

@ -21,6 +21,7 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -34,6 +35,7 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.EmptyBroker;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
@ -585,12 +587,14 @@ public class RegionBroker extends EmptyBroker {
@Override
public synchronized void addBroker(Connection connection, BrokerInfo info) {
brokerInfos.add(info);
updateClients();
}
@Override
public synchronized void removeBroker(Connection connection, BrokerInfo info) {
if (info != null) {
brokerInfos.remove(info);
updateClients();
}
}
@ -830,4 +834,11 @@ public class RegionBroker extends EmptyBroker {
LOG.warn("unmatched destination: " + destination + ", in consumerControl: " + control);
}
}
protected void updateClients() {
List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
for (TransportConnector connector : connectors) {
connector.updateClientClusterInfo();
}
}
}

View File

@ -17,7 +17,6 @@
package org.apache.activemq.command;
import java.util.Map;
import org.apache.activemq.util.IntrospectionSupport;
@ -61,6 +60,7 @@ public abstract class BaseCommand implements Command {
this.responseRequired = responseRequired;
}
@Override
public String toString() {
return toString(null);
}
@ -105,6 +105,10 @@ public abstract class BaseCommand implements Command {
return false;
}
public boolean isConnectionControl() {
return false;
}
/**
* The endpoint within the transport where this message came from.
*/

View File

@ -16,13 +16,12 @@
*/
package org.apache.activemq.command;
import org.apache.activemq.plugin.StatisticsBrokerPlugin;
import java.io.IOException;
import java.util.Properties;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.Properties;
/**
* When a client connects to a broker, the broker send the client a BrokerInfo
@ -50,6 +49,29 @@ public class BrokerInfo extends BaseCommand {
String brokerUploadUrl;
String networkProperties;
public BrokerInfo copy() {
BrokerInfo copy = new BrokerInfo();
copy(copy);
return copy;
}
private void copy(BrokerInfo copy) {
super.copy(copy);
copy.brokerId = this.brokerId;
copy.brokerURL = this.brokerURL;
copy.slaveBroker = this.slaveBroker;
copy.masterBroker = this.masterBroker;
copy.faultTolerantConfiguration = this.faultTolerantConfiguration;
copy.networkConnection = this.networkConnection;
copy.duplexConnection = this.duplexConnection;
copy.peerBrokerInfos = this.peerBrokerInfos;
copy.brokerName = this.brokerName;
copy.connectionId = this.connectionId;
copy.brokerUploadUrl = this.brokerUploadUrl;
copy.networkProperties = this.networkProperties;
}
@Override
public boolean isBrokerInfo() {
return true;
}

View File

@ -53,6 +53,8 @@ public interface Command extends DataStructure {
boolean isShutdownInfo();
boolean isConnectionControl();
Response visit(CommandVisitor visitor) throws Exception;
/**

View File

@ -31,6 +31,9 @@ public class ConnectionControl extends BaseCommand {
protected boolean close;
protected boolean exit;
protected boolean faultTolerant;
protected String connectedBrokers="";
protected String reconnectTo = "";
protected boolean rebalanceConnection;
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
@ -39,6 +42,10 @@ public class ConnectionControl extends BaseCommand {
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processConnectionControl(this);
}
@Override
public boolean isConnectionControl() {
return true;
}
/**
* @openwire:property version=1
@ -114,4 +121,49 @@ public class ConnectionControl extends BaseCommand {
public void setSuspend(boolean suspend) {
this.suspend = suspend;
}
/**
* @openwire:property version=6 cache=false
* @return connected brokers.
*/
public String getConnectedBrokers() {
return this.connectedBrokers;
}
/**
* @param connectedBrokers the connectedBrokers to set
*/
public void setConnectedBrokers(String connectedBrokers) {
this.connectedBrokers = connectedBrokers;
}
/**
* @openwire:property version=6 cache=false
* @return the reconnectTo
*/
public String getReconnectTo() {
return this.reconnectTo;
}
/**
* @param reconnectTo the reconnectTo to set
*/
public void setReconnectTo(String reconnectTo) {
this.reconnectTo = reconnectTo;
}
/**
* @return the rebalanceConnection
* @openwire:property version=6 cache=false
*/
public boolean isRebalanceConnection() {
return this.rebalanceConnection;
}
/**
* @param rebalanceConnection the rebalanceConnection to set
*/
public void setRebalanceConnection(boolean rebalanceConnection) {
this.rebalanceConnection = rebalanceConnection;
}
}

View File

@ -35,6 +35,7 @@ public class ConnectionInfo extends BaseCommand {
protected boolean brokerMasterConnector;
protected boolean manageable;
protected boolean clientMaster = true;
protected boolean faultTolerant = false;
protected transient Object transportContext;
public ConnectionInfo() {
@ -65,6 +66,7 @@ public class ConnectionInfo extends BaseCommand {
copy.manageable = manageable;
copy.clientMaster = clientMaster;
copy.transportContext = transportContext;
copy.faultTolerant= faultTolerant;
}
/**
@ -199,4 +201,19 @@ public class ConnectionInfo extends BaseCommand {
this.clientMaster = clientMaster;
}
/**
* @openwire:property version=6 cache=false
* @return the faultTolerant
*/
public boolean isFaultTolerant() {
return this.faultTolerant;
}
/**
* @param faultTolerant the faultTolerant to set
*/
public void setFaultTolerant(boolean faultTolerant) {
this.faultTolerant = faultTolerant;
}
}

View File

@ -118,6 +118,10 @@ public class PartialCommand implements Command {
return false;
}
public boolean isConnectionControl() {
return false;
}
public void setResponseRequired(boolean responseRequired) {
}
@ -137,5 +141,4 @@ public class PartialCommand implements Command {
return "PartialCommand[id: " + commandId + " data: " + size + " byte(s)]";
}
}

View File

@ -47,14 +47,12 @@ public class RemoveSubscriptionInfo extends BaseCommand {
/**
* @openwire:property version=1
* @deprecated
*/
public String getSubcriptionName() {
return subscriptionName;
}
/**
* @deprecated
*/
public void setSubcriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;

View File

@ -76,7 +76,6 @@ public class SubscriptionInfo implements DataStructure {
/**
* @openwire:property version=1
* @deprecated
*/
public String getSubcriptionName() {
return subscriptionName;
@ -84,7 +83,6 @@ public class SubscriptionInfo implements DataStructure {
/**
* @param subscriptionName *
* @deprecated
*/
public void setSubcriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
@ -102,16 +100,19 @@ public class SubscriptionInfo implements DataStructure {
return false;
}
@Override
public String toString() {
return IntrospectionSupport.toString(this);
}
@Override
public int hashCode() {
int h1 = clientId != null ? clientId.hashCode() : -1;
int h2 = subscriptionName != null ? subscriptionName.hashCode() : -1;
return h1 ^ h2;
}
@Override
public boolean equals(Object obj) {
boolean result = false;
if (obj instanceof SubscriptionInfo) {

View File

@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
@ -290,6 +289,7 @@ public class WireFormatInfo implements Command, MarshallAware {
return visitor.processWireFormat(this);
}
@Override
public String toString() {
Map<String, Object> p = null;
try {
@ -357,6 +357,10 @@ public class WireFormatInfo implements Command, MarshallAware {
return false;
}
public boolean isConnectionControl() {
return false;
}
public void setCachedMarshalledForm(WireFormat wireFormat, ByteSequence data) {
}

View File

@ -30,7 +30,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
@ -131,7 +130,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
private BrokerInfo localBrokerInfo;
private BrokerInfo remoteBrokerInfo;
private AtomicBoolean started = new AtomicBoolean();
private final AtomicBoolean started = new AtomicBoolean();
private TransportConnection duplexInitiatingConnection;
private BrokerService brokerService = null;
@ -153,11 +152,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (started.compareAndSet(false, true)) {
localBroker.setTransportListener(new DefaultTransportListener() {
@Override
public void onCommand(Object o) {
Command command = (Command) o;
serviceLocalCommand(command);
}
@Override
public void onException(IOException error) {
serviceLocalException(error);
}
@ -318,6 +319,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (!isCreatedByDuplex()) {
BrokerInfo brokerInfo = new BrokerInfo();
brokerInfo.setBrokerName(configuration.getBrokerName());
brokerInfo.setBrokerURL(configuration.getBrokerURL());
brokerInfo.setNetworkConnection(true);
brokerInfo.setDuplexConnection(configuration.isDuplex());
// set our properties

View File

@ -17,7 +17,6 @@
package org.apache.activemq.network;
import java.util.List;
import org.apache.activemq.command.ActiveMQDestination;
/**
@ -36,6 +35,7 @@ public class NetworkBridgeConfiguration {
private int prefetchSize = 1000;
private int networkTTL = 1;
private String brokerName = "localhost";
private String brokerURL = "";
private String userName;
private String password;
private String destinationFilter = ">";
@ -274,4 +274,18 @@ public class NetworkBridgeConfiguration {
public void setSuppressDuplicateQueueSubscriptions(boolean val) {
suppressDuplicateQueueSubscriptions = val;
}
/**
* @return the brokerURL
*/
public String getBrokerURL() {
return this.brokerURL;
}
/**
* @param brokerURL the brokerURL to set
*/
public void setBrokerURL(String brokerURL) {
this.brokerURL = brokerURL;
}
}

View File

@ -71,6 +71,9 @@ public class ConnectionControlMarshaller extends BaseCommandMarshaller {
info.setFaultTolerant(bs.readBoolean());
info.setResume(bs.readBoolean());
info.setSuspend(bs.readBoolean());
info.setConnectedBrokers(tightUnmarshalString(dataIn, bs));
info.setReconnectTo(tightUnmarshalString(dataIn, bs));
info.setRebalanceConnection(bs.readBoolean());
}
@ -88,6 +91,9 @@ public class ConnectionControlMarshaller extends BaseCommandMarshaller {
bs.writeBoolean(info.isFaultTolerant());
bs.writeBoolean(info.isResume());
bs.writeBoolean(info.isSuspend());
rc += tightMarshalString1(info.getConnectedBrokers(), bs);
rc += tightMarshalString1(info.getReconnectTo(), bs);
bs.writeBoolean(info.isRebalanceConnection());
return rc + 0;
}
@ -108,6 +114,9 @@ public class ConnectionControlMarshaller extends BaseCommandMarshaller {
bs.readBoolean();
bs.readBoolean();
bs.readBoolean();
tightMarshalString2(info.getConnectedBrokers(), dataOut, bs);
tightMarshalString2(info.getReconnectTo(), dataOut, bs);
bs.readBoolean();
}
@ -127,6 +136,9 @@ public class ConnectionControlMarshaller extends BaseCommandMarshaller {
info.setFaultTolerant(dataIn.readBoolean());
info.setResume(dataIn.readBoolean());
info.setSuspend(dataIn.readBoolean());
info.setConnectedBrokers(looseUnmarshalString(dataIn));
info.setReconnectTo(looseUnmarshalString(dataIn));
info.setRebalanceConnection(dataIn.readBoolean());
}
@ -144,6 +156,9 @@ public class ConnectionControlMarshaller extends BaseCommandMarshaller {
dataOut.writeBoolean(info.isFaultTolerant());
dataOut.writeBoolean(info.isResume());
dataOut.writeBoolean(info.isSuspend());
looseMarshalString(info.getConnectedBrokers(), dataOut);
looseMarshalString(info.getReconnectTo(), dataOut);
dataOut.writeBoolean(info.isRebalanceConnection());
}
}

View File

@ -85,6 +85,7 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller {
info.setBrokerMasterConnector(bs.readBoolean());
info.setManageable(bs.readBoolean());
info.setClientMaster(bs.readBoolean());
info.setFaultTolerant(bs.readBoolean());
}
@ -105,6 +106,7 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller {
bs.writeBoolean(info.isBrokerMasterConnector());
bs.writeBoolean(info.isManageable());
bs.writeBoolean(info.isClientMaster());
bs.writeBoolean(info.isFaultTolerant());
return rc + 0;
}
@ -128,6 +130,7 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller {
bs.readBoolean();
bs.readBoolean();
bs.readBoolean();
bs.readBoolean();
}
@ -161,6 +164,7 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller {
info.setBrokerMasterConnector(dataIn.readBoolean());
info.setManageable(dataIn.readBoolean());
info.setClientMaster(dataIn.readBoolean());
info.setFaultTolerant(dataIn.readBoolean());
}
@ -181,6 +185,7 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller {
dataOut.writeBoolean(info.isBrokerMasterConnector());
dataOut.writeBoolean(info.isManageable());
dataOut.writeBoolean(info.isClientMaster());
dataOut.writeBoolean(info.isFaultTolerant());
}
}

View File

@ -21,7 +21,6 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.Service;
import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.Transport;
@ -46,7 +45,7 @@ public class ProxyConnector implements Service {
private URI remote;
private URI localUri;
private String name;
private CopyOnWriteArrayList<ProxyConnection> connections = new CopyOnWriteArrayList<ProxyConnection>();
private final CopyOnWriteArrayList<ProxyConnection> connections = new CopyOnWriteArrayList<ProxyConnection>();
public void start() throws Exception {
@ -131,13 +130,14 @@ public class ProxyConnector implements Service {
private Transport createRemoteTransport() throws Exception {
Transport transport = TransportFactory.compositeConnect(remote);
CompositeTransport ct = (CompositeTransport)transport.narrow(CompositeTransport.class);
CompositeTransport ct = transport.narrow(CompositeTransport.class);
if (ct != null && localUri != null) {
ct.add(new URI[] {localUri});
ct.add(false,new URI[] {localUri});
}
// Add a transport filter so that can track the transport life cycle
transport = new TransportFilter(transport) {
@Override
public void stop() throws Exception {
LOG.info("Stopping proxy.");
super.stop();

View File

@ -19,6 +19,6 @@ package org.apache.activemq.transport;
import java.net.URI;
public interface CompositeTransport extends Transport {
void add(URI[] uris);
void remove(URI[] uris);
void add(boolean rebalance,URI[] uris);
void remove(boolean rebalance,URI[] uris);
}

View File

@ -18,7 +18,6 @@ package org.apache.activemq.transport;
import java.io.IOException;
import java.net.URI;
import org.apache.activemq.Service;
/**
@ -147,6 +146,15 @@ public interface Transport extends Service {
*/
boolean isConnected();
/**
* @return true if reconnect is supported
*/
boolean isReconnectSupported();
/**
* @return true if updating uris is supported
*/
boolean isUpdateURIsSupported();
/**
* reconnect to another location
* @param uri
@ -154,6 +162,14 @@ public interface Transport extends Service {
*/
void reconnect(URI uri) throws IOException;
/**
* Provide a list of available alternative locations
* @param rebalance
* @param uris
* @throws IOException
*/
void updateURIs(boolean rebalance,URI[] uris) throws IOException;
/**
* Returns a counter which gets incremented as data is read from the transport.
* It should only be used to determine if there is progress being made in reading the next command from the transport.

View File

@ -45,7 +45,8 @@ public class TransportFilter implements TransportListener, Transport {
/**
* @see org.apache.activemq.Service#start()
* @throws IOException if the next channel has not been set.
* @throws IOException
* if the next channel has not been set.
*/
public void start() throws Exception {
if (next == null) {
@ -75,6 +76,7 @@ public class TransportFilter implements TransportListener, Transport {
return next;
}
@Override
public String toString() {
return next.toString();
}
@ -141,4 +143,16 @@ public class TransportFilter implements TransportListener, Transport {
public int getReceiveCounter() {
return next.getReceiveCounter();
}
public boolean isReconnectSupported() {
return next.isReconnectSupported();
}
public boolean isUpdateURIsSupported() {
return next.isUpdateURIsSupported();
}
public void updateURIs(boolean rebalance,URI[] uris) throws IOException {
next.updateURIs(rebalance,uris);
}
}

View File

@ -18,7 +18,6 @@ package org.apache.activemq.transport;
import java.io.IOException;
import java.net.URI;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -96,8 +95,9 @@ public abstract class TransportSupport extends ServiceSupport implements Transpo
try {
transportListener.onException(e);
} catch (RuntimeException e2) {
// Handle any unexpected runtime exceptions by debug logging them.
LOG.debug("Unexpected runtime exception: "+e2, e2);
// Handle any unexpected runtime exceptions by debug logging
// them.
LOG.debug("Unexpected runtime exception: " + e2, e2);
}
}
}
@ -112,11 +112,21 @@ public abstract class TransportSupport extends ServiceSupport implements Transpo
return false;
}
public void reconnect(URI uri) throws IOException {
throw new IOException("Not supported");
}
public boolean isReconnectSupported() {
return false;
}
public boolean isUpdateURIsSupported() {
return false;
}
public void updateURIs(boolean reblance,URI[] uris) throws IOException {
throw new IOException("Not supported");
}
public boolean isDisposed() {
return isStopped();
}

View File

@ -20,7 +20,6 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.TransportFilter;
@ -50,6 +49,7 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList
this.next = next;
}
@Override
public void start() throws Exception {
if (discoveryAgent == null) {
throw new IllegalStateException("discoveryAgent not configured");
@ -61,6 +61,7 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList
next.start();
}
@Override
public void stop() throws Exception {
ServiceStopper ss = new ServiceStopper();
ss.stop(discoveryAgent);
@ -75,7 +76,7 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList
URI uri = new URI(url);
serviceURIs.put(event.getServiceName(), uri);
LOG.info("Adding new broker connection URL: " + uri);
next.add(new URI[] {URISupport.applyParameters(uri, parameters)});
next.add(false,new URI[] {URISupport.applyParameters(uri, parameters)});
} catch (URISyntaxException e) {
LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
}
@ -85,7 +86,7 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList
public void onServiceRemove(DiscoveryEvent event) {
URI uri = serviceURIs.get(event.getServiceName());
if (uri != null) {
next.remove(new URI[] {uri});
next.remove(false,new URI[] {uri});
}
}

View File

@ -20,12 +20,11 @@ package org.apache.activemq.transport.failover;
import java.io.IOException;
import java.net.URI;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
class BackupTransport extends DefaultTransportListener{
private FailoverTransport failoverTransport;
private final FailoverTransport failoverTransport;
private Transport transport;
private URI uri;
private boolean disposed;
@ -33,10 +32,11 @@ class BackupTransport extends DefaultTransportListener{
BackupTransport(FailoverTransport ft){
this.failoverTransport=ft;
}
@Override
public void onException(IOException error) {
this.disposed=true;
if (failoverTransport!=null) {
this.failoverTransport.reconnect();
this.failoverTransport.reconnect(false);
}
}
@ -62,10 +62,12 @@ class BackupTransport extends DefaultTransportListener{
this.disposed = disposed;
}
@Override
public int hashCode() {
return uri != null ? uri.hashCode():-1;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof BackupTransport) {
BackupTransport other = (BackupTransport) obj;

View File

@ -19,15 +19,18 @@ package org.apache.activemq.transport.failover;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionId;
@ -50,6 +53,7 @@ import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A Transport that is made reliable by being able to fail over to another
* transport when a transport failure is detected.
@ -64,6 +68,7 @@ public class FailoverTransport implements CompositeTransport {
private boolean disposed;
private boolean connected;
private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>();
private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>();
private final Object reconnectMutex = new Object();
private final Object backupMutex = new Object();
@ -77,28 +82,28 @@ public class FailoverTransport implements CompositeTransport {
private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
private final TaskRunner reconnectTask;
private boolean started;
private boolean initialized;
private long initialReconnectDelay = 10;
private long maxReconnectDelay = 1000 * 30;
private double backOffMultiplier = 2d;
private long timeout = -1;
private boolean useExponentialBackOff = true;
private boolean randomize = true;
private boolean initialized;
private int maxReconnectAttempts;
private int startupMaxReconnectAttempts;
private int connectFailures;
private long reconnectDelay = this.initialReconnectDelay;
private Exception connectionFailure;
private boolean firstConnection = true;
//optionally always have a backup created
private boolean backup=false;
private List<BackupTransport> backups=new CopyOnWriteArrayList<BackupTransport>();
private int backupPoolSize=1;
// optionally always have a backup created
private boolean backup = false;
private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>();
private int backupPoolSize = 1;
private boolean trackMessages = false;
private boolean trackTransactionProducers = true;
private int maxCacheSize = 128 * 1024;
private TransportListener disposedListener = new DefaultTransportListener() {};
private final TransportListener disposedListener = new DefaultTransportListener() {
};
private boolean connectionInterruptProcessingComplete;
private final TransportListener myTransportListener = createTransportListener();
@ -109,20 +114,20 @@ public class FailoverTransport implements CompositeTransport {
// Setup a task that is used to reconnect the a connection async.
reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
public boolean iterate() {
boolean result=false;
boolean buildBackup=true;
boolean result = false;
boolean buildBackup = true;
boolean doReconnect = !disposed;
synchronized(backupMutex) {
if (connectedTransport.get()==null && !disposed) {
result=doReconnect();
buildBackup=false;
synchronized (backupMutex) {
if (connectedTransport.get() == null && !disposed) {
result = doReconnect();
buildBackup = false;
}
}
if(buildBackup) {
if (buildBackup) {
buildBackups();
}else {
//build backups on the next iteration
result=true;
} else {
// build backups on the next iteration
result = true;
try {
reconnectTask.wakeup();
} catch (InterruptedException e) {
@ -138,32 +143,25 @@ public class FailoverTransport implements CompositeTransport {
TransportListener createTransportListener() {
return new TransportListener() {
public void onCommand(Object o) {
Command command = (Command)o;
Command command = (Command) o;
if (command == null) {
return;
}
if (command.isResponse()) {
Object object = null;
synchronized(requestMap) {
object = requestMap.remove(Integer.valueOf(((Response)command).getCorrelationId()));
synchronized (requestMap) {
object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
}
if (object != null && object.getClass() == Tracked.class) {
((Tracked)object).onResponses();
((Tracked) object).onResponses();
}
}
if (!initialized) {
if (command.isBrokerInfo()) {
BrokerInfo info = (BrokerInfo)command;
BrokerInfo[] peers = info.getPeerBrokerInfos();
if (peers != null) {
for (int i = 0; i < peers.length; i++) {
String brokerString = peers[i].getBrokerURL();
add(brokerString);
}
}
initialized = true;
}
if(command.isConnectionControl()) {
handleConnectionControl((ConnectionControl) command);
}
if (transportListener != null) {
transportListener.onCommand(command);
@ -193,7 +191,6 @@ public class FailoverTransport implements CompositeTransport {
};
}
public final void handleTransportFailure(IOException e) throws InterruptedException {
if (LOG.isTraceEnabled()) {
LOG.trace(this + " handleTransportFailure: " + e);
@ -212,19 +209,21 @@ public class FailoverTransport implements CompositeTransport {
boolean reconnectOk = false;
synchronized (reconnectMutex) {
if(started) {
LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed to " + connectedTransportURI+ " , attempting to automatically reconnect due to: " + e);
if (started) {
LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed to " + connectedTransportURI
+ " , attempting to automatically reconnect due to: " + e);
LOG.debug("Transport failed with the following exception:", e);
reconnectOk = true;
}
initialized = false;
failedConnectTransportURI=connectedTransportURI;
failedConnectTransportURI = connectedTransportURI;
connectedTransportURI = null;
connected=false;
connected = false;
stateTracker.transportInterrupted();
// notify before any reconnect attempt so ack state can be whacked
// notify before any reconnect attempt so ack state can be
// whacked
if (transportListener != null) {
transportListener.transportInterupted();
}
@ -234,7 +233,49 @@ public class FailoverTransport implements CompositeTransport {
}
}
}
}
public final void handleConnectionControl(ConnectionControl control) {
String reconnectStr = control.getReconnectTo();
if (reconnectStr != null) {
reconnectStr = reconnectStr.trim();
if (reconnectStr.length() > 0) {
try {
URI uri = new URI(reconnectStr);
if (isReconnectSupported()) {
reconnect(uri);
LOG.info("Reconnected to: " + uri);
}
} catch (Exception e) {
LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e);
}
}
}
String connectedStr = control.getConnectedBrokers();
if (connectedStr != null) {
connectedStr = connectedStr.trim();
if (connectedStr.length() > 0 && isUpdateURIsSupported()) {
List<URI> list = new ArrayList<URI>();
StringTokenizer tokenizer = new StringTokenizer(connectedStr, ",");
while (tokenizer.hasMoreTokens()) {
String str = tokenizer.nextToken();
try {
URI uri = new URI(str);
list.add(uri);
} catch (Exception e) {
LOG.error("Failed to parse broker address: " + str, e);
}
}
if (list.isEmpty() == false) {
try {
updateURIs(control.isRebalanceConnection(), list.toArray(new URI[list.size()]));
} catch (IOException e) {
LOG.error("Failed to update transport URI's from: " + connectedStr, e);
}
}
}
}
}
public void start() throws Exception {
@ -250,13 +291,13 @@ public class FailoverTransport implements CompositeTransport {
if (connectedTransport.get() != null) {
stateTracker.restore(connectedTransport.get());
} else {
reconnect();
reconnect(false);
}
}
}
public void stop() throws Exception {
Transport transportToStop=null;
Transport transportToStop = null;
synchronized (reconnectMutex) {
LOG.debug("Stopped.");
if (!started) {
@ -265,7 +306,7 @@ public class FailoverTransport implements CompositeTransport {
started = false;
disposed = true;
connected = false;
for (BackupTransport t:backups) {
for (BackupTransport t : backups) {
t.setDisposed(true);
}
backups.clear();
@ -279,7 +320,7 @@ public class FailoverTransport implements CompositeTransport {
sleepMutex.notifyAll();
}
reconnectTask.shutdown();
if( transportToStop!=null ) {
if (transportToStop != null) {
transportToStop.stop();
}
}
@ -356,7 +397,8 @@ public class FailoverTransport implements CompositeTransport {
}
/**
* @param randomize The randomize to set.
* @param randomize
* The randomize to set.
*/
public void setRandomize(boolean randomize) {
this.randomize = randomize;
@ -403,29 +445,30 @@ public class FailoverTransport implements CompositeTransport {
}
/**
* @return Returns true if the command is one sent when a connection
* is being closed.
* @return Returns true if the command is one sent when a connection is
* being closed.
*/
private boolean isShutdownCommand(Command command) {
return (command != null && (command.isShutdownInfo() || command instanceof RemoveInfo));
}
public void oneway(Object o) throws IOException {
Command command = (Command)o;
Command command = (Command) o;
Exception error = null;
try {
synchronized (reconnectMutex) {
if (isShutdownCommand(command) && connectedTransport.get() == null) {
if(command.isShutdownInfo()) {
// Skipping send of ShutdownInfo command when not connected.
if (command.isShutdownInfo()) {
// Skipping send of ShutdownInfo command when not
// connected.
return;
}
if(command instanceof RemoveInfo || command.isMessageAck()) {
// Simulate response to RemoveInfo command or ack (as it will be stale)
if (command instanceof RemoveInfo || command.isMessageAck()) {
// Simulate response to RemoveInfo command or ack (as it
// will be stale)
stateTracker.track(command);
Response response = new Response();
response.setCorrelationId(command.getCommandId());
@ -441,8 +484,7 @@ public class FailoverTransport implements CompositeTransport {
Transport transport = connectedTransport.get();
long start = System.currentTimeMillis();
boolean timedout = false;
while (transport == null && !disposed
&& connectionFailure == null
while (transport == null && !disposed && connectionFailure == null
&& !Thread.currentThread().isInterrupted()) {
LOG.trace("Waiting for transport to reconnect..: " + command);
long end = System.currentTimeMillis();
@ -469,7 +511,7 @@ public class FailoverTransport implements CompositeTransport {
error = connectionFailure;
} else if (timedout == true) {
error = new IOException("Failover timeout of " + timeout + " ms reached.");
}else {
} else {
error = new IOException("Unexpected failure.");
}
break;
@ -480,7 +522,7 @@ public class FailoverTransport implements CompositeTransport {
// then hold it in the requestMap so that we can replay
// it later.
Tracked tracked = stateTracker.track(command);
synchronized(requestMap) {
synchronized (requestMap) {
if (tracked != null && tracked.isWaitingForResponse()) {
requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
} else if (tracked == null && command.isResponseRequired()) {
@ -531,7 +573,7 @@ public class FailoverTransport implements CompositeTransport {
if (!disposed) {
if (error != null) {
if (error instanceof IOException) {
throw (IOException)error;
throw (IOException) error;
}
throw IOExceptionSupport.create(error);
}
@ -550,38 +592,53 @@ public class FailoverTransport implements CompositeTransport {
throw new AssertionError("Unsupported Method");
}
public void add(URI u[]) {
public void add(boolean rebalance, URI u[]) {
boolean newURI = false;
for (int i = 0; i < u.length; i++) {
if (!uris.contains(u[i])) {
uris.add(u[i]);
if (contains(u[i])==false) {
uris.add(i, u[i]);
newURI = true;
}
}
reconnect();
if (newURI) {
reconnect(rebalance);
}
}
public void remove(URI u[]) {
public void remove(boolean rebalance, URI u[]) {
for (int i = 0; i < u.length; i++) {
uris.remove(u[i]);
}
reconnect();
reconnect(rebalance);
}
public void add(String u) {
public void add(boolean rebalance, String u) {
try {
URI uri = new URI(u);
if (!uris.contains(uri)) {
uris.add(uri);
URI newURI = new URI(u);
if (contains(newURI)==false) {
uris.add(newURI);
reconnect(rebalance);
}
reconnect();
} catch (Exception e) {
LOG.error("Failed to parse URI: " + u);
}
}
public void reconnect() {
public void reconnect(boolean rebalance) {
synchronized (reconnectMutex) {
if (started) {
if (rebalance) {
Transport transport = this.connectedTransport.getAndSet(null);
if (transport != null) {
try {
transport.stop();
} catch (Exception e) {
LOG.debug("Caught an exception stopping existing transport", e);
}
}
}
LOG.debug("Waking up reconnect task");
try {
reconnectTask.wakeup();
@ -603,7 +660,7 @@ public class FailoverTransport implements CompositeTransport {
if (randomize) {
// Randomly, reorder the list by random swapping
for (int i = 0; i < l.size(); i++) {
int p = (int) (Math.random()*100 % l.size());
int p = (int) (Math.random() * 100 % l.size());
URI t = l.get(p);
l.set(p, l.get(i));
l.set(i, t);
@ -621,7 +678,7 @@ public class FailoverTransport implements CompositeTransport {
}
public void setTransportListener(TransportListener commandListener) {
synchronized(listenerMutex) {
synchronized (listenerMutex) {
this.transportListener = commandListener;
listenerMutex.notifyAll();
}
@ -633,7 +690,7 @@ public class FailoverTransport implements CompositeTransport {
return target.cast(this);
}
Transport transport = connectedTransport.get();
if ( transport != null) {
if (transport != null) {
return transport.narrow(target);
}
return null;
@ -642,13 +699,13 @@ public class FailoverTransport implements CompositeTransport {
protected void restoreTransport(Transport t) throws Exception, IOException {
t.start();
//send information to the broker - informing it we are an ft client
// send information to the broker - informing it we are an ft client
ConnectionControl cc = new ConnectionControl();
cc.setFaultTolerant(true);
t.oneway(cc);
stateTracker.restore(t);
Map tmpMap = null;
synchronized(requestMap) {
synchronized (requestMap) {
tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
}
for (Iterator<Command> iter2 = tmpMap.values().iterator(); iter2.hasNext();) {
@ -668,13 +725,14 @@ public class FailoverTransport implements CompositeTransport {
this.useExponentialBackOff = useExponentialBackOff;
}
@Override
public String toString() {
return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString();
}
public String getRemoteAddress() {
Transport transport = connectedTransport.get();
if ( transport != null) {
if (transport != null) {
return transport.getRemoteAddress();
}
return null;
@ -702,7 +760,7 @@ public class FailoverTransport implements CompositeTransport {
if (!useExponentialBackOff) {
reconnectDelay = initialReconnectDelay;
}
synchronized(backupMutex) {
synchronized (backupMutex) {
if (backup && !backups.isEmpty()) {
BackupTransport bt = backups.remove(0);
Transport t = bt.getTransport();
@ -713,21 +771,21 @@ public class FailoverTransport implements CompositeTransport {
restoreTransport(t);
}
reconnectDelay = initialReconnectDelay;
failedConnectTransportURI=null;
failedConnectTransportURI = null;
connectedTransportURI = uri;
connectedTransport.set(t);
reconnectMutex.notifyAll();
connectFailures = 0;
LOG.info("Successfully reconnected to backup " + uri);
return false;
}catch (Exception e) {
LOG.debug("Backup transport failed",e);
} catch (Exception e) {
LOG.debug("Backup transport failed", e);
}
}
}
Iterator<URI> iter = connectList.iterator();
while(iter.hasNext() && connectedTransport.get() == null && !disposed) {
while (iter.hasNext() && connectedTransport.get() == null && !disposed) {
URI uri = iter.next();
Transport t = null;
try {
@ -746,34 +804,36 @@ public class FailoverTransport implements CompositeTransport {
connectedTransport.set(t);
reconnectMutex.notifyAll();
connectFailures = 0;
// Make sure on initial startup, that the transportListener
// Make sure on initial startup, that the
// transportListener
// has been initialized for this instance.
synchronized(listenerMutex) {
if (transportListener==null) {
synchronized (listenerMutex) {
if (transportListener == null) {
try {
//if it isn't set after 2secs - it
//probably never will be
// if it isn't set after 2secs - it
// probably never will be
listenerMutex.wait(2000);
}catch(InterruptedException ex) {}
} catch (InterruptedException ex) {
}
}
}
if (transportListener != null) {
transportListener.transportResumed();
}else {
} else {
LOG.debug("transport resumed by transport listener not set");
}
if (firstConnection) {
firstConnection=false;
firstConnection = false;
LOG.info("Successfully connected to " + uri);
}else {
} else {
LOG.info("Successfully reconnected to " + uri);
}
connected=true;
connected = true;
return false;
} catch (Exception e) {
failure = e;
LOG.debug("Connect fail to: " + uri + ", reason: " + e);
if (t!=null) {
if (t != null) {
try {
t.stop();
} catch (Exception ee) {
@ -790,27 +850,28 @@ public class FailoverTransport implements CompositeTransport {
reconnectAttempts = this.startupMaxReconnectAttempts;
}
}
if (reconnectAttempts==0) {
if (reconnectAttempts == 0) {
reconnectAttempts = this.maxReconnectAttempts;
}
if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) {
LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
connectionFailure = failure;
// Make sure on initial startup, that the transportListener has been initialized
// Make sure on initial startup, that the transportListener has
// been initialized
// for this instance.
synchronized(listenerMutex) {
if (transportListener==null) {
synchronized (listenerMutex) {
if (transportListener == null) {
try {
listenerMutex.wait(2000);
}catch(InterruptedException ex) {}
} catch (InterruptedException ex) {
}
}
}
if(transportListener != null) {
if (transportListener != null) {
if (connectionFailure instanceof IOException) {
transportListener.onException((IOException)connectionFailure);
transportListener.onException((IOException) connectionFailure);
} else {
transportListener.onException(IOExceptionSupport.create(connectionFailure));
}
@ -841,21 +902,20 @@ public class FailoverTransport implements CompositeTransport {
return !disposed;
}
final boolean buildBackups() {
synchronized (backupMutex) {
if (!disposed && backup && backups.size() < backupPoolSize) {
List<URI> connectList = getConnectList();
//removed disposed backups
List<BackupTransport>disposedList = new ArrayList<BackupTransport>();
for (BackupTransport bt:backups) {
// removed disposed backups
List<BackupTransport> disposedList = new ArrayList<BackupTransport>();
for (BackupTransport bt : backups) {
if (bt.isDisposed()) {
disposedList.add(bt);
}
}
backups.removeAll(disposedList);
disposedList.clear();
for (Iterator<URI>iter = connectList.iterator();iter.hasNext() && backups.size() < backupPoolSize;) {
for (Iterator<URI> iter = connectList.iterator(); iter.hasNext() && backups.size() < backupPoolSize;) {
URI uri = iter.next();
if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
try {
@ -868,8 +928,8 @@ public class FailoverTransport implements CompositeTransport {
bt.setTransport(t);
backups.add(bt);
}
} catch(Exception e) {
LOG.debug("Failed to build backup ",e);
} catch (Exception e) {
LOG.debug("Failed to build backup ", e);
}
}
}
@ -882,18 +942,52 @@ public class FailoverTransport implements CompositeTransport {
return disposed;
}
public boolean isConnected() {
return connected;
}
public void reconnect(URI uri) throws IOException {
add(new URI[] {uri});
add(true, new URI[] { uri });
}
public boolean isReconnectSupported() {
return true;
}
public boolean isUpdateURIsSupported() {
return true;
}
public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
List<URI> copy = new ArrayList<URI>(this.updated);
List<URI> add = new ArrayList<URI>();
if (updatedURIs != null && updatedURIs.length > 0) {
Set<URI> set = new HashSet<URI>();
for (int i = 0; i < updatedURIs.length; i++) {
URI uri = updatedURIs[i];
if (uri != null) {
set.add(uri);
}
}
for (URI uri : set) {
if (copy.remove(uri) == false) {
add.add(uri);
}
}
synchronized (reconnectMutex) {
this.updated.clear();
this.updated.addAll(add);
for (URI uri : copy) {
this.uris.remove(uri);
}
add(rebalance, add.toArray(new URI[add.size()]));
}
}
}
public int getReceiveCounter() {
Transport transport = connectedTransport.get();
if( transport == null ) {
if (transport == null) {
return 0;
}
return transport.getReceiveCounter();
@ -904,4 +998,25 @@ public class FailoverTransport implements CompositeTransport {
stateTracker.connectionInterruptProcessingComplete(this, connectionId);
}
}
private boolean contains(URI newURI) {
boolean result = false;
try {
for (URI uri:uris) {
if (newURI.getPort()==uri.getPort()) {
InetAddress newAddr = InetAddress.getByName(newURI.getHost());
InetAddress addr = InetAddress.getByName(uri.getHost());
if (addr.equals(newAddr)) {
result = true;
break;
}
}
}
}catch(IOException e) {
result = true;
LOG.error("Failed to verify URI " + newURI + " already known: " + e);
}
return result;
}
}

View File

@ -20,7 +20,6 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
@ -32,6 +31,7 @@ import org.apache.activemq.util.URISupport.CompositeData;
public class FailoverTransportFactory extends TransportFactory {
@Override
public Transport doConnect(URI location) throws IOException {
try {
Transport transport = createTransport(URISupport.parseComposite(location));
@ -43,6 +43,7 @@ public class FailoverTransportFactory extends TransportFactory {
}
}
@Override
public Transport doCompositeConnect(URI location) throws IOException {
try {
return createTransport(URISupport.parseComposite(location));
@ -62,7 +63,7 @@ public class FailoverTransportFactory extends TransportFactory {
if (!options.isEmpty()) {
throw new IllegalArgumentException("Invalid connect parameters: " + options);
}
transport.add(compositData.getComponents());
transport.add(false,compositData.getComponents());
return transport;
}
@ -72,6 +73,7 @@ public class FailoverTransportFactory extends TransportFactory {
return transport;
}
@Override
public TransportServer doBind(URI location) throws IOException {
throw new IOException("Invalid server URI: " + location);
}

View File

@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
@ -65,7 +64,7 @@ public class FanoutTransport implements CompositeTransport {
private final TaskRunner reconnectTask;
private boolean started;
private ArrayList<FanoutTransportHandler> transports = new ArrayList<FanoutTransportHandler>();
private final ArrayList<FanoutTransportHandler> transports = new ArrayList<FanoutTransportHandler>();
private int connectedCount;
private int minAckCount = 2;
@ -73,7 +72,7 @@ public class FanoutTransport implements CompositeTransport {
private long initialReconnectDelay = 10;
private long maxReconnectDelay = 1000 * 30;
private long backOffMultiplier = 2;
private boolean useExponentialBackOff = true;
private final boolean useExponentialBackOff = true;
private int maxReconnectAttempts;
private Exception connectionFailure;
private FanoutTransportHandler primary;
@ -89,6 +88,7 @@ public class FanoutTransport implements CompositeTransport {
this.ackCount = new AtomicInteger(count);
}
@Override
public String toString() {
return command.getCommandId() + "=" + ackCount.get();
}
@ -107,6 +107,7 @@ public class FanoutTransport implements CompositeTransport {
this.uri = uri;
}
@Override
public void onCommand(Object o) {
Command command = (Command)o;
if (command.isResponse()) {
@ -125,6 +126,7 @@ public class FanoutTransport implements CompositeTransport {
}
}
@Override
public void onException(IOException error) {
try {
synchronized (reconnectMutex) {
@ -499,7 +501,7 @@ public class FanoutTransport implements CompositeTransport {
}
}
public void add(URI uris[]) {
public void add(boolean reblance,URI uris[]) {
synchronized (reconnectMutex) {
for (int i = 0; i < uris.length; i++) {
@ -523,7 +525,7 @@ public class FanoutTransport implements CompositeTransport {
}
public void remove(URI uris[]) {
public void remove(boolean rebalance,URI uris[]) {
synchronized (reconnectMutex) {
for (int i = 0; i < uris.length; i++) {
@ -546,10 +548,21 @@ public class FanoutTransport implements CompositeTransport {
}
public void reconnect(URI uri) throws IOException {
add(new URI[]{uri});
add(true,new URI[]{uri});
}
public boolean isReconnectSupported() {
return true;
}
public boolean isUpdateURIsSupported() {
return true;
}
public void updateURIs(boolean reblance,URI[] uris) throws IOException {
add(reblance,uris);
}
public String getRemoteAddress() {
if (primary != null) {

View File

@ -18,7 +18,6 @@ package org.apache.activemq.transport.mock;
import java.io.IOException;
import java.net.URI;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
@ -70,6 +69,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
getNext().stop();
}
@Override
public void onCommand(Object command) {
getTransportListener().onCommand(command);
}
@ -88,6 +88,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
return transportListener;
}
@Override
public String toString() {
return getNext().toString();
}
@ -108,6 +109,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
return getNext().request(command, timeout);
}
@Override
public void onException(IOException error) {
getTransportListener().onException(error);
}
@ -155,4 +157,16 @@ public class MockTransport extends DefaultTransportListener implements Transport
public int getReceiveCounter() {
return getNext().getReceiveCounter();
}
public boolean isReconnectSupported() {
return getNext().isReconnectSupported();
}
public boolean isUpdateURIsSupported() {
return getNext().isUpdateURIsSupported();
}
public void updateURIs(boolean reblance,URI[] uris) throws IOException {
getNext().updateURIs(reblance,uris);
}
}

View File

@ -134,6 +134,10 @@ public class StompFrame implements Command {
return false;
}
public boolean isConnectionControl() {
return false;
}
public boolean isWireFormatInfo() {
return false;
}

View File

@ -35,9 +35,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
import org.apache.activemq.Service;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportLoggerFactory;
@ -171,6 +169,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
/**
* @return pretty print of 'this'
*/
@Override
public String toString() {
return "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
}
@ -398,6 +397,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
}
}
@Override
protected void doStart() throws Exception {
connect();
stoppedLatch.set(new CountDownLatch(1));
@ -454,6 +454,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
initializeStreams();
}
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping transport " + this);

View File

@ -21,7 +21,6 @@ import java.net.URI;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -275,6 +274,7 @@ public class VMTransport implements Transport, Task {
this.network = network;
}
@Override
public String toString() {
return location + "#" + id;
}
@ -345,6 +345,17 @@ public class VMTransport implements Transport, Task {
throw new IOException("Not supported");
}
public boolean isReconnectSupported() {
return false;
}
public boolean isUpdateURIsSupported() {
return false;
}
public void updateURIs(boolean reblance,URI[] uris) throws IOException {
throw new IOException("Not supported");
}
public int getReceiveCounter() {
return receiveCounter;
}

View File

@ -56,5 +56,8 @@ public class ConnectionControlTest extends BaseCommandTestSupport {
info.setFaultTolerant(true);
info.setResume(false);
info.setSuspend(true);
info.setConnectedBrokers("ConnectedBrokers:1");
info.setReconnectTo("ReconnectTo:2");
info.setRebalanceConnection(false);
}
}

View File

@ -65,5 +65,6 @@ public class ConnectionInfoTest extends BaseCommandTestSupport {
info.setBrokerMasterConnector(true);
info.setManageable(false);
info.setClientMaster(true);
info.setFaultTolerant(false);
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.perf;
import java.io.File;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBStore;
@ -31,6 +32,13 @@ public class KahaDBDurableTopicTest extends SimpleDurableTopicTest {
super.setUp();
}
@Override
protected ActiveMQConnectionFactory createConnectionFactory(String uri) throws Exception {
ActiveMQConnectionFactory result = new ActiveMQConnectionFactory(uri);
//result.setDispatchAsync(false);
return result;
}
@Override
protected void configureBroker(BrokerService answer, String uri) throws Exception {
@ -52,7 +60,7 @@ public class KahaDBDurableTopicTest extends SimpleDurableTopicTest {
// small batch means more frequent and smaller writes
kaha.setIndexWriteBatchSize(100);
kaha.setIndexCacheSize(10000);
kaha.setIndexCacheSize(1000);
// do the index write in a separate thread
//kaha.setEnableIndexWriteAsync(true);

View File

@ -27,7 +27,7 @@ public class KahaDBQueueTest extends SimpleQueueTest {
@Override
protected void setUp() throws Exception {
this.initialConsumerDelay = 10 * 1000;
// this.initialConsumerDelay = 10 * 1000;
super.setUp();
}
@Override
@ -43,7 +43,7 @@ public class KahaDBQueueTest extends SimpleQueueTest {
// The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified
// what happens if the index is updated but a journal update is lost.
// Index is going to be in consistent, but can it be repaired?
//kaha.setEnableJournalDiskSyncs(false);
kaha.setEnableJournalDiskSyncs(false);
// Using a bigger journal file size makes he take fewer spikes as it is not switching files as often.
//kaha.setJournalMaxFileLength(1024*1024*100);
@ -51,6 +51,7 @@ public class KahaDBQueueTest extends SimpleQueueTest {
kaha.setIndexWriteBatchSize(100);
// do the index write in a separate thread
kaha.setEnableIndexWriteAsync(true);
kaha.setIndexCacheSize(10000);
answer.setPersistenceAdapter(kaha);
answer.addConnector(uri);

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.perf;
import java.io.File;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBStore;
public class RunBroker {
public static void main(String arg[]) {
try {
KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter();
File dataFileDir = new File("target/test-amq-data/perfTest/kahadb");
File archiveDir = new File(dataFileDir,"archive");
KahaDBStore kahaDB = new KahaDBStore();
kahaDB.setDirectory(dataFileDir);
kahaDB.setDirectoryArchive(archiveDir);
kahaDB.setArchiveDataLogs(true);
// The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified
// what happens if the index is updated but a journal update is lost.
// Index is going to be in consistent, but can it be repaired?
//kaha.setEnableJournalDiskSyncs(false);
// Using a bigger journal file size makes he take fewer spikes as it is not switching files as often.
//kaha.setJournalMaxFileLength(1024*1024*100);
// small batch means more frequent and smaller writes
kahaDB.setIndexWriteBatchSize(1000);
kahaDB.setIndexCacheSize(10000);
// do the index write in a separate thread
kahaDB.setEnableIndexWriteAsync(true);
BrokerService broker = new BrokerService();
broker.setUseJmx(false);
//broker.setPersistenceAdapter(adaptor);
//broker.setPersistenceAdapter(kahaDB);
broker.setPersistent(false);
broker.setDeleteAllMessagesOnStartup(true);
broker.addConnector("tcp://0.0.0.0:61616");
broker.start();
System.err.println("Running");
Thread.sleep(Long.MAX_VALUE);
}catch(Throwable e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,149 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.failover;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.NetworkConnector;
public class FailoverClusterTest extends TestCase {
private static final int NUMBER = 10;
private static final String BROKER_A_BIND_ADDRESS = "tcp://0.0.0.0:61616";
private static final String BROKER_B_BIND_ADDRESS = "tcp://0.0.0.0:61617";
private static final String CLIENT_URL = "failover://("+BROKER_A_BIND_ADDRESS+")";
private static final String BROKER_A_NAME = "BROKERA";
private static final String BROKER_B_NAME = "BROKERB";
private BrokerService brokerA;
private BrokerService brokerB;
private final List<ActiveMQConnection>connections = new ArrayList<ActiveMQConnection>();
public void testClusterConnectedAfterClients() throws Exception{
createClients();
if (brokerB == null) {
brokerB = createBrokerB(BROKER_B_BIND_ADDRESS);
}
Thread.sleep(3000);
Set<String> set = new HashSet<String>();
for (ActiveMQConnection c:connections) {
set.add(c.getTransportChannel().getRemoteAddress());
}
assertTrue(set.size() > 1);
}
public void testClusterConnectedBeforeClients() throws Exception{
if (brokerB == null) {
brokerB = createBrokerB(BROKER_B_BIND_ADDRESS);
}
Thread.sleep(5000);
createClients();
Thread.sleep(2000);
brokerA.stop();
Thread.sleep(2000);
URI brokerBURI = new URI(BROKER_B_BIND_ADDRESS);
for (ActiveMQConnection c:connections) {
String addr = c.getTransportChannel().getRemoteAddress();
assertTrue(addr.indexOf(""+brokerBURI.getPort()) > 0);
}
}
@Override
protected void setUp() throws Exception {
if (brokerA == null) {
brokerA = createBrokerA(BROKER_A_BIND_ADDRESS);
}
}
@Override
protected void tearDown() throws Exception {
for (Connection c:connections) {
c.close();
}
if (brokerB != null) {
brokerB.stop();
brokerB = null;
}
if (brokerA != null) {
brokerA.stop();
brokerA = null;
}
}
protected BrokerService createBrokerA(String uri) throws Exception {
BrokerService answer = new BrokerService();
configureConsumerBroker(answer,uri);
answer.start();
return answer;
}
protected void configureConsumerBroker(BrokerService answer,String uri) throws Exception {
answer.setBrokerName(BROKER_A_NAME);
answer.setPersistent(false);
TransportConnector connector = answer.addConnector(uri);
connector.setRebalanceClusterClients(true);
connector.setUpdateClusterClients(true);
answer.setUseShutdownHook(false);
}
protected BrokerService createBrokerB(String uri) throws Exception {
BrokerService answer = new BrokerService();
configureNetwork(answer,uri);
answer.start();
return answer;
}
protected void configureNetwork(BrokerService answer,String uri) throws Exception {
answer.setBrokerName(BROKER_B_NAME);
answer.setPersistent(false);
NetworkConnector network = answer.addNetworkConnector("static://"+BROKER_A_BIND_ADDRESS);
network.setDuplex(true);
TransportConnector connector =answer.addConnector(uri);
connector.setRebalanceClusterClients(true);
connector.setUpdateClusterClients(true);
answer.setUseShutdownHook(false);
}
protected void createClients() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(CLIENT_URL);
for (int i =0;i < NUMBER; i++) {
ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
c.start();
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName());
MessageConsumer consumer = s.createConsumer(queue);
connections.add(c);
}
}
}