Fix for https://issues.apache.org/jira/browse/AMQ-4165 - remove pure master/slave functionality

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1408651 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2012-11-13 09:31:37 +00:00
parent 7f346a8b79
commit 39ff4d9fd8
7 changed files with 8 additions and 983 deletions

View File

@ -21,7 +21,6 @@ import org.apache.activemq.ConfigurationException;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
import org.apache.activemq.broker.ft.MasterConnector;
import org.apache.activemq.broker.jmx.*;
import org.apache.activemq.broker.region.*;
import org.apache.activemq.broker.region.policy.PolicyMap;
@ -78,7 +77,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
* @org.apache.xbean.XBean
*/
public class BrokerService implements Service {
protected CountDownLatch slaveStartSignal = new CountDownLatch(1);
public static final String DEFAULT_PORT = "61616";
public static final String LOCAL_HOST_NAME;
public static final String BROKER_VERSION;
@ -123,8 +121,6 @@ public class BrokerService implements Service {
private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
private final List<Service> services = new ArrayList<Service>();
private MasterConnector masterConnector;
private String masterConnectorURI;
private transient Thread shutdownHook;
private String[] transportConnectorURIs;
private String[] networkConnectorURIs;
@ -396,28 +392,11 @@ public class BrokerService implements Service {
return null;
}
/**
* @return Returns the masterConnectorURI.
*/
public String getMasterConnectorURI() {
return masterConnectorURI;
}
/**
* @param masterConnectorURI
* The masterConnectorURI to set.
*/
public void setMasterConnectorURI(String masterConnectorURI) {
this.masterConnectorURI = masterConnectorURI;
}
/**
* @return true if this Broker is a slave to a Master
*/
public boolean isSlave() {
return (masterConnector != null && masterConnector.isSlave()) ||
(masterConnector != null && masterConnector.isStoppedBeforeStart()) ||
(masterConnector == null && slave);
return slave;
}
public void masterFailed() {
@ -624,22 +603,11 @@ public class BrokerService implements Service {
managedBroker.setContextBroker(broker);
adminView.setBroker(managedBroker);
}
// see if there is a MasterBroker service and if so, configure
// it and start it.
for (Service service : services) {
if (service instanceof MasterConnector) {
configureService(service);
service.start();
}
}
if (!isSlave() && (masterConnector == null || isShutdownOnMasterFailure() == false)) {
if (!isSlave()) {
startAllConnectors();
}
if (!stopped.get()) {
if (isUseJmx() && masterConnector != null) {
registerFTConnectorMBean(masterConnector);
}
}
if (ioExceptionHandler == null) {
setIoExceptionHandler(new DefaultIOExceptionHandler());
}
@ -723,24 +691,7 @@ public class BrokerService implements Service {
stopped.set(true);
stoppedLatch.countDown();
}
if (masterConnectorURI == null) {
// master start has not finished yet
if (slaveStartSignal.getCount() == 1) {
started.set(false);
slaveStartSignal.countDown();
}
} else {
for (Service service : services) {
if (service instanceof MasterConnector) {
MasterConnector mConnector = (MasterConnector) service;
if (!mConnector.isSlave()) {
// means should be slave but not connected to master yet
started.set(false);
mConnector.stopBeforeConnected();
}
}
}
}
if (this.taskRunnerFactory != null) {
this.taskRunnerFactory.shutdown();
this.taskRunnerFactory = null;
@ -1794,20 +1745,6 @@ public class BrokerService implements Service {
addJmsConnector(jmsBridgeConnectors[i]);
}
}
for (Service service : services) {
if (service instanceof MasterConnector) {
masterServiceExists = true;
break;
}
}
if (masterConnectorURI != null) {
if (masterServiceExists) {
throw new IllegalStateException(
"Cannot specify masterConnectorURI when a masterConnector is already registered via the services property");
} else {
addService(new MasterConnector(masterConnectorURI));
}
}
}
protected void checkSystemUsageLimits() throws IOException {
@ -2005,16 +1942,7 @@ public class BrokerService implements Service {
}
}
protected void registerFTConnectorMBean(MasterConnector connector) throws IOException {
FTConnectorView view = new FTConnectorView(connector);
try {
ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
+ JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=MasterConnector");
AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
} catch (Throwable e) {
throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
}
}
protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
JmsConnectorView view = new JmsConnectorView(connector);
@ -2304,20 +2232,6 @@ public class BrokerService implements Service {
return BrokerSupport.getConnectionContext(getBroker());
}
protected void waitForSlave() {
try {
if (!slaveStartSignal.await(waitForSlaveTimeout, TimeUnit.MILLISECONDS)) {
throw new IllegalStateException("Gave up waiting for slave to start after " + waitForSlaveTimeout + " milliseconds.");
}
} catch (InterruptedException e) {
LOG.error("Exception waiting for slave:" + e);
}
}
protected void slaveConnectionEstablished() {
slaveStartSignal.countDown();
}
protected void startManagementContext() throws Exception {
getManagementContext().setBrokerName(brokerName);
getManagementContext().start();
@ -2351,9 +2265,7 @@ public class BrokerService implements Service {
map.put("network", "true");
map.put("async", "false");
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
if (isWaitForSlave()) {
waitForSlave();
}
if (!stopped.get()) {
ThreadPoolExecutor networkConnectorStartExecutor = null;
if (isNetworkConnectorStartAsync()) {
@ -2445,12 +2357,6 @@ public class BrokerService implements Service {
BrokerServiceAware serviceAware = (BrokerServiceAware) service;
serviceAware.setBrokerService(this);
}
if (masterConnector == null) {
if (service instanceof MasterConnector) {
masterConnector = (MasterConnector) service;
supportFailOver = true;
}
}
}
public void handleIOException(IOException exception) {
@ -2621,10 +2527,6 @@ public class BrokerService implements Service {
this.waitForSlaveTimeout = waitForSlaveTimeout;
}
public CountDownLatch getSlaveStartSignal() {
return slaveStartSignal;
}
/**
* Get the passiveSlave
* @return the passiveSlave

View File

@ -37,7 +37,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.transaction.xa.XAResource;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.*;
@ -86,7 +85,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
protected TaskRunner taskRunner;
protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
private MasterBroker masterBroker;
private final Transport transport;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
private WireFormatInfo wireFormatInfo;
@ -645,13 +643,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
public Response processAddConnection(ConnectionInfo info) throws Exception {
// if the broker service has slave attached, wait for the slave to be
// attached to allow client connection. slave connection is fine
if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave()
&& connector.getBrokerService().getSlaveStartSignal().getCount() == 1) {
ServiceSupport.dispose(transport);
return new ExceptionResponse(new Exception("Master's slave not attached yet."));
}
// Older clients should have been defaulting this field to true.. but
// they were not.
if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
@ -1023,9 +1014,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
connector.onStopped(this);
try {
synchronized (this) {
if (masterBroker != null) {
masterBroker.stop();
}
if (duplexBridge != null) {
duplexBridge.stop();
}
@ -1216,19 +1204,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
public Response processBrokerInfo(BrokerInfo info) {
if (info.isSlaveBroker()) {
BrokerService bService = connector.getBrokerService();
// Do we only support passive slaves - or does the slave want to be
// passive ?
boolean passive = bService.isPassiveSlave() || info.isPassiveSlave();
if (passive == false) {
// stream messages from this broker (the master) to
// the slave
MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
masterBroker = new MasterBroker(parent, transport);
masterBroker.startProcessing();
}
LOG.info((passive ? "Passive" : "Active") + " Slave Broker " + info.getBrokerName() + " is attached");
bService.slaveConnectionEstablished();
LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: " + info.getBrokerName());
} else if (info.isNetworkConnection() && info.isDuplexConnection()) {
// so this TransportConnection is the rear end of a network bridge
// We have been requested to create a two way pipe ...

View File

@ -1,400 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.ft;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.InsertableMutableBrokerFilter;
import org.apache.activemq.broker.MutableBrokerFilter;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The Message Broker which passes messages to a slave
*
*
*/
public class MasterBroker extends InsertableMutableBrokerFilter {
private static final Logger LOG = LoggerFactory.getLogger(MasterBroker.class);
private Transport slave;
private AtomicBoolean started = new AtomicBoolean(false);
private Map<ConsumerId, ConsumerId> consumers = new ConcurrentHashMap<ConsumerId, ConsumerId>();
/**
* Constructor
*
* @param parent
* @param transport
*/
public MasterBroker(MutableBrokerFilter parent, Transport transport) {
super(parent);
this.slave = transport;
this.slave = new MutexTransport(slave);
this.slave = new ResponseCorrelator(slave);
this.slave.setTransportListener(transport.getTransportListener());
}
/**
* start processing this broker
*/
public void startProcessing() {
started.set(true);
try {
Connection[] connections = getClients();
ConnectionControl command = new ConnectionControl();
command.setFaultTolerant(true);
if (connections != null) {
for (int i = 0; i < connections.length; i++) {
if (connections[i].isActive() && connections[i].isManageable()) {
connections[i].dispatchAsync(command);
}
}
}
} catch (Exception e) {
LOG.error("Failed to get Connections", e);
}
}
/**
* stop the broker
*
* @throws Exception
*/
public void stop() throws Exception {
stopProcessing();
}
/**
* stop processing this broker
*/
public void stopProcessing() {
if (started.compareAndSet(true, false)) {
remove();
}
}
/**
* A client is establishing a connection with the broker.
*
* @param context
* @param info
* @throws Exception
*/
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
super.addConnection(context, info);
sendAsyncToSlave(info);
}
/**
* A client is disconnecting from the broker.
*
* @param context the environment the operation is being executed under.
* @param info
* @param error null if the client requested the disconnect or the error
* that caused the client to disconnect.
* @throws Exception
*/
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
super.removeConnection(context, info, error);
sendAsyncToSlave(new RemoveInfo(info.getConnectionId()));
}
/**
* Adds a session.
*
* @param context
* @param info
* @throws Exception
*/
public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
super.addSession(context, info);
sendAsyncToSlave(info);
}
/**
* Removes a session.
*
* @param context
* @param info
* @throws Exception
*/
public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
super.removeSession(context, info);
sendAsyncToSlave(new RemoveInfo(info.getSessionId()));
}
/**
* Adds a producer.
*
* @param context the enviorment the operation is being executed under.
* @param info
* @throws Exception
*/
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
super.addProducer(context, info);
sendAsyncToSlave(info);
}
/**
* Removes a producer.
*
* @param context the environment the operation is being executed under.
* @param info
* @throws Exception
*/
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
super.removeProducer(context, info);
sendAsyncToSlave(new RemoveInfo(info.getProducerId()));
}
/**
* add a consumer
*
* @param context
* @param info
* @return the associated subscription
* @throws Exception
*/
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
sendSyncToSlave(info);
consumers.put(info.getConsumerId(), info.getConsumerId());
return super.addConsumer(context, info);
}
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info)
throws Exception {
super.removeConsumer(context, info);
consumers.remove(info.getConsumerId());
sendSyncToSlave(new RemoveInfo(info.getConsumerId()));
}
/**
* remove a subscription
*
* @param context
* @param info
* @throws Exception
*/
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
super.removeSubscription(context, info);
sendAsyncToSlave(info);
}
@Override
public void addDestinationInfo(ConnectionContext context,
DestinationInfo info) throws Exception {
super.addDestinationInfo(context, info);
if (info.getDestination().isTemporary()) {
sendAsyncToSlave(info);
}
}
@Override
public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
super.removeDestinationInfo(context, info);
if (info.getDestination().isTemporary()) {
sendAsyncToSlave(info);
}
}
/**
* begin a transaction
*
* @param context
* @param xid
* @throws Exception
*/
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.BEGIN);
sendAsyncToSlave(info);
super.beginTransaction(context, xid);
}
/**
* Prepares a transaction. Only valid for xa transactions.
*
* @param context
* @param xid
* @return the state
* @throws Exception
*/
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.PREPARE);
sendSyncToSlave(info);
int result = super.prepareTransaction(context, xid);
return result;
}
/**
* Rollsback a transaction.
*
* @param context
* @param xid
* @throws Exception
*/
public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.ROLLBACK);
sendAsyncToSlave(info);
super.rollbackTransaction(context, xid);
}
/**
* Commits a transaction.
*
* @param context
* @param xid
* @param onePhase
* @throws Exception
*/
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.COMMIT_ONE_PHASE);
sendSyncToSlave(info);
super.commitTransaction(context, xid, onePhase);
}
/**
* Forgets a transaction.
*
* @param context
* @param xid
* @throws Exception
*/
public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.FORGET);
sendAsyncToSlave(info);
super.forgetTransaction(context, xid);
}
/**
* Notifiy the Broker that a dispatch will happen
* Do in 'pre' so that slave will avoid getting ack before dispatch
* similar logic to send() below.
* @param messageDispatch
*/
public void preProcessDispatch(MessageDispatch messageDispatch) {
super.preProcessDispatch(messageDispatch);
MessageDispatchNotification mdn = new MessageDispatchNotification();
mdn.setConsumerId(messageDispatch.getConsumerId());
mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
mdn.setDestination(messageDispatch.getDestination());
if (messageDispatch.getMessage() != null) {
Message msg = messageDispatch.getMessage();
mdn.setMessageId(msg.getMessageId());
if (consumers.containsKey(messageDispatch.getConsumerId())) {
sendSyncToSlave(mdn);
}
}
}
/**
* @param context
* @param message
* @throws Exception
*/
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
/**
* A message can be dispatched before the super.send() method returns so -
* here the order is switched to avoid problems on the slave with
* receiving acks for messages not received yet
* copy ensures we don't mess with the correlator and command ids
*/
sendSyncToSlave(message.copy());
super.send(producerExchange, message);
}
/**
* @param context
* @param ack
* @throws Exception
*/
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
sendToSlave(ack);
super.acknowledge(consumerExchange, ack);
}
public boolean isFaultTolerantConfiguration() {
return true;
}
protected void sendToSlave(Message message) {
if (message.isResponseRequired()) {
sendSyncToSlave(message);
} else {
sendAsyncToSlave(message);
}
}
protected void sendToSlave(MessageAck ack) {
if (ack.isResponseRequired()) {
sendAsyncToSlave(ack);
} else {
sendSyncToSlave(ack);
}
}
protected void sendAsyncToSlave(Command command) {
try {
slave.oneway(command);
} catch (Throwable e) {
LOG.error("Slave Failed", e);
stopProcessing();
}
}
protected void sendSyncToSlave(Command command) {
try {
Response response = (Response)slave.request(command);
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse)response;
LOG.error("Slave Failed", er.getException());
}
} catch (Throwable e) {
LOG.error("Slave Failed", e);
}
}
}

View File

@ -1,369 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.ft;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Connects a Slave Broker to a Master when using <a
* href="http://activemq.apache.org/masterslave.html">Master Slave</a> for High
* Availability of messages.
*
* @org.apache.xbean.XBean
*
*/
public class MasterConnector implements Service, BrokerServiceAware {
private static final Logger LOG = LoggerFactory.getLogger(MasterConnector.class);
private BrokerService broker;
private URI remoteURI;
private URI localURI;
private Transport localBroker;
private Transport remoteBroker;
private TransportConnector connector;
private AtomicBoolean started = new AtomicBoolean(false);
private AtomicBoolean stoppedBeforeStart = new AtomicBoolean(false);
private final IdGenerator idGenerator = new IdGenerator();
private String userName;
private String password;
private ConnectionInfo connectionInfo;
private SessionInfo sessionInfo;
private ProducerInfo producerInfo;
private final AtomicBoolean masterActive = new AtomicBoolean();
private BrokerInfo brokerInfo;
private boolean firstConnection=true;
private boolean failedToStart;
public MasterConnector() {
}
public MasterConnector(String remoteUri) throws URISyntaxException {
remoteURI = new URI(remoteUri);
}
public void setBrokerService(BrokerService broker) {
this.broker = broker;
if (localURI == null) {
localURI = broker.getVmConnectorURI();
}
if (connector == null) {
List transportConnectors = broker.getTransportConnectors();
if (!transportConnectors.isEmpty()) {
connector = (TransportConnector)transportConnectors.get(0);
}
}
}
public boolean isSlave() {
return masterActive.get();
}
protected void restartBridge() throws Exception {
localBroker.oneway(connectionInfo);
remoteBroker.oneway(connectionInfo);
localBroker.oneway(sessionInfo);
remoteBroker.oneway(sessionInfo);
remoteBroker.oneway(producerInfo);
remoteBroker.oneway(brokerInfo);
}
public void start() throws Exception {
if (!started.compareAndSet(false, true)) {
return;
}
if (remoteURI == null) {
throw new IllegalArgumentException("You must specify a remoteURI");
}
localBroker = TransportFactory.connect(localURI);
remoteBroker = TransportFactory.connect(remoteURI);
LOG.info("Starting a slave connection between " + localBroker + " and " + remoteBroker);
localBroker.setTransportListener(new DefaultTransportListener() {
public void onCommand(Object command) {
}
public void onException(IOException error) {
if (started.get()) {
serviceLocalException(error);
}
}
});
remoteBroker.setTransportListener(new DefaultTransportListener() {
public void onCommand(Object o) {
Command command = (Command)o;
if (started.get()) {
serviceRemoteCommand(command);
}
}
public void onException(IOException error) {
if (started.get()) {
serviceRemoteException(error);
}
}
public void transportResumed() {
try{
if(!firstConnection){
localBroker = TransportFactory.connect(localURI);
localBroker.setTransportListener(new DefaultTransportListener() {
public void onCommand(Object command) {
}
public void onException(IOException error) {
if (started.get()) {
serviceLocalException(error);
}
}
});
localBroker.start();
restartBridge();
LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been reestablished.");
}else{
firstConnection=false;
}
}catch(IOException e){
LOG.error("MasterConnector failed to send BrokerInfo in transportResumed:", e);
}catch(Exception e){
LOG.error("MasterConnector failed to restart localBroker in transportResumed:", e);
}
}
});
try {
localBroker.start();
remoteBroker.start();
startBridge();
masterActive.set(true);
} catch (Exception e) {
masterActive.set(false);
if(!stoppedBeforeStart.get()){
LOG.error("Failed to start network bridge: " + e, e);
}else{
LOG.info("Slave stopped before connected to the master.");
}
setFailedToStart(true);
}
}
protected void startBridge() throws Exception {
connectionInfo = new ConnectionInfo();
connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
connectionInfo.setClientId(idGenerator.generateId());
connectionInfo.setUserName(userName);
connectionInfo.setPassword(password);
connectionInfo.setBrokerMasterConnector(true);
sessionInfo = new SessionInfo(connectionInfo, 1);
producerInfo = new ProducerInfo(sessionInfo, 1);
producerInfo.setResponseRequired(false);
if (connector != null) {
brokerInfo = connector.getBrokerInfo();
} else {
brokerInfo = new BrokerInfo();
}
brokerInfo.setBrokerName(broker.getBrokerName());
brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos());
brokerInfo.setSlaveBroker(true);
brokerInfo.setPassiveSlave(broker.isPassiveSlave());
restartBridge();
LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been established.");
}
public void stop() throws Exception {
if (!started.compareAndSet(true, false)||!masterActive.get()) {
return;
}
masterActive.set(false);
try {
// if (connectionInfo!=null){
// localBroker.request(connectionInfo.createRemoveCommand());
// }
// localBroker.setTransportListener(null);
// remoteBroker.setTransportListener(null);
remoteBroker.oneway(new ShutdownInfo());
localBroker.oneway(new ShutdownInfo());
} catch (IOException e) {
LOG.debug("Caught exception stopping", e);
} finally {
ServiceStopper ss = new ServiceStopper();
ss.stop(localBroker);
ss.stop(remoteBroker);
ss.throwFirstException();
}
}
public void stopBeforeConnected()throws Exception{
masterActive.set(false);
started.set(false);
stoppedBeforeStart.set(true);
ServiceStopper ss = new ServiceStopper();
ss.stop(localBroker);
ss.stop(remoteBroker);
}
protected void serviceRemoteException(IOException error) {
LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
shutDown();
}
protected void serviceRemoteCommand(Command command) {
try {
if (command.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch)command;
command = md.getMessage();
}
if (command.getDataStructureType() == CommandTypes.SHUTDOWN_INFO) {
LOG.warn("The Master has shutdown");
shutDown();
} else {
boolean responseRequired = command.isResponseRequired();
int commandId = command.getCommandId();
if (responseRequired) {
Response response = (Response)localBroker.request(command);
response.setCorrelationId(commandId);
remoteBroker.oneway(response);
} else {
localBroker.oneway(command);
}
}
} catch (IOException e) {
serviceRemoteException(e);
}
}
protected void serviceLocalException(Throwable error) {
if (!(error instanceof TransportDisposedIOException) || localBroker.isDisposed()){
LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
ServiceSupport.dispose(this);
}else{
LOG.info(error.getMessage());
}
}
/**
* @return Returns the localURI.
*/
public URI getLocalURI() {
return localURI;
}
/**
* @param localURI The localURI to set.
*/
public void setLocalURI(URI localURI) {
this.localURI = localURI;
}
/**
* @return Returns the remoteURI.
*/
public URI getRemoteURI() {
return remoteURI;
}
/**
* @param remoteURI The remoteURI to set.
*/
public void setRemoteURI(URI remoteURI) {
this.remoteURI = remoteURI;
}
/**
* @return Returns the password.
*/
public String getPassword() {
return password;
}
/**
* @param password The password to set.
*/
public void setPassword(String password) {
this.password = password;
}
/**
* @return Returns the userName.
*/
public String getUserName() {
return userName;
}
/**
* @param userName The userName to set.
*/
public void setUserName(String userName) {
this.userName = userName;
}
private void shutDown() {
masterActive.set(false);
broker.masterFailed();
ServiceSupport.dispose(this);
}
public boolean isStoppedBeforeStart() {
return stoppedBeforeStart.get();
}
/**
* Get the failedToStart
* @return the failedToStart
*/
public boolean isFailedToStart() {
return this.failedToStart;
}
/**
* Set the failedToStart
* @param failedToStart the failedToStart to set
*/
public void setFailedToStart(boolean failedToStart) {
this.failedToStart = failedToStart;
}
}

View File

@ -1,25 +0,0 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<html>
<head>
</head>
<body>
Helper classes for implementing fault tolerance
</body>
</html>

View File

@ -1,36 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import org.apache.activemq.broker.ft.MasterConnector;
public class FTConnectorView implements FTConnectorViewMBean {
private final MasterConnector connector;
public FTConnectorView(MasterConnector connector) {
this.connector = connector;
}
public void start() throws Exception {
connector.start();
}
public void stop() throws Exception {
connector.stop();
}
}

View File

@ -1,23 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import org.apache.activemq.Service;
public interface FTConnectorViewMBean extends Service {
}