git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@813906 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2009-09-11 16:16:08 +00:00
parent b7179731ff
commit 42282810a8
5 changed files with 192 additions and 165 deletions

View File

@ -111,6 +111,7 @@ public class BrokerService implements Service {
private boolean shutdownOnMasterFailure; private boolean shutdownOnMasterFailure;
private boolean shutdownOnSlaveFailure; private boolean shutdownOnSlaveFailure;
private boolean waitForSlave; private boolean waitForSlave;
private boolean passiveSlave;
private String brokerName = DEFAULT_BROKER_NAME; private String brokerName = DEFAULT_BROKER_NAME;
private File dataDirectoryFile; private File dataDirectoryFile;
private File tmpDataDirectory; private File tmpDataDirectory;
@ -1551,7 +1552,7 @@ public class BrokerService implements Service {
getManagementContext().unregisterMBean(objectName); getManagementContext().unregisterMBean(objectName);
} catch (Throwable e) { } catch (Throwable e) {
throw IOExceptionSupport.create( throw IOExceptionSupport.create(
"Transport Connector could not be registered in JMX: " + e.getMessage(), e); "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e);
} }
} }
} }
@ -2070,5 +2071,21 @@ public class BrokerService implements Service {
return slaveStartSignal; return slaveStartSignal;
} }
/**
* Get the passiveSlave
* @return the passiveSlave
*/
public boolean isPassiveSlave() {
return this.passiveSlave;
}
/**
* Set the passiveSlave
* @param passiveSlave the passiveSlave to set
*/
public void setPassiveSlave(boolean passiveSlave) {
this.passiveSlave = passiveSlave;
}
} }

View File

@ -91,17 +91,13 @@ import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.URISupport; import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
/** /**
* @version $Revision: 1.8 $ * @version $Revision: 1.8 $
*/ */
public class TransportConnection implements Connection, Task, CommandVisitor { public class TransportConnection implements Connection, Task, CommandVisitor {
private static final Log LOG = LogFactory.getLog(TransportConnection.class); private static final Log LOG = LogFactory.getLog(TransportConnection.class);
private static final Log TRANSPORTLOG = LogFactory.getLog(TransportConnection.class.getName() private static final Log TRANSPORTLOG = LogFactory.getLog(TransportConnection.class.getName() + ".Transport");
+ ".Transport");
private static final Log SERVICELOG = LogFactory.getLog(TransportConnection.class.getName() + ".Service"); private static final Log SERVICELOG = LogFactory.getLog(TransportConnection.class.getName() + ".Service");
// Keeps track of the broker and connector that created this connection. // Keeps track of the broker and connector that created this connection.
protected final Broker broker; protected final Broker broker;
protected final TransportConnector connector; protected final TransportConnector connector;
@ -115,7 +111,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
protected TaskRunner taskRunner; protected TaskRunner taskRunner;
protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>(); protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
protected AtomicBoolean dispatchStopped = new AtomicBoolean(false); protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
private MasterBroker masterBroker; private MasterBroker masterBroker;
private final Transport transport; private final Transport transport;
private MessageAuthorizationPolicy messageAuthorizationPolicy; private MessageAuthorizationPolicy messageAuthorizationPolicy;
@ -147,23 +142,22 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
private DemandForwardingBridge duplexBridge; private DemandForwardingBridge duplexBridge;
private final TaskRunnerFactory taskRunnerFactory; private final TaskRunnerFactory taskRunnerFactory;
private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
/** /**
* @param connector * @param connector
* @param transport * @param transport
* @param broker * @param broker
* @param taskRunnerFactory - can be null if you want direct dispatch to the * @param taskRunnerFactory
* transport else commands are sent async. * - can be null if you want direct dispatch to the transport
* else commands are sent async.
*/ */
public TransportConnection(TransportConnector connector, final Transport transport, Broker broker, public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
TaskRunnerFactory taskRunnerFactory) { TaskRunnerFactory taskRunnerFactory) {
this.connector = connector; this.connector = connector;
this.broker = broker; this.broker = broker;
this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy(); this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
RegionBroker rb = (RegionBroker)broker.getAdaptor(RegionBroker.class); RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
brokerConnectionStates = rb.getConnectionStates(); brokerConnectionStates = rb.getConnectionStates();
if (connector != null) { if (connector != null) {
this.statistics.setParent(connector.getStatistics()); this.statistics.setParent(connector.getStatistics());
@ -171,14 +165,13 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
this.taskRunnerFactory = taskRunnerFactory; this.taskRunnerFactory = taskRunnerFactory;
this.transport = transport; this.transport = transport;
this.transport.setTransportListener(new DefaultTransportListener() { this.transport.setTransportListener(new DefaultTransportListener() {
public void onCommand(Object o) { public void onCommand(Object o) {
serviceLock.readLock().lock(); serviceLock.readLock().lock();
try { try {
if (!(o instanceof Command)) { if (!(o instanceof Command)) {
throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString()); throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
} }
Command command = (Command)o; Command command = (Command) o;
Response response = service(command); Response response = service(command);
if (response != null) { if (response != null) {
dispatchSync(response); dispatchSync(response);
@ -206,22 +199,22 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
* @return size of dispatch queue * @return size of dispatch queue
*/ */
public int getDispatchQueueSize() { public int getDispatchQueueSize() {
synchronized(dispatchQueue) { synchronized (dispatchQueue) {
return dispatchQueue.size(); return dispatchQueue.size();
} }
} }
public void serviceTransportException(IOException e) { public void serviceTransportException(IOException e) {
BrokerService bService=connector.getBrokerService(); BrokerService bService = connector.getBrokerService();
if(bService.isShutdownOnSlaveFailure()){ if (bService.isShutdownOnSlaveFailure()) {
if(brokerInfo!=null){ if (brokerInfo != null) {
if(brokerInfo.isSlaveBroker()){ if (brokerInfo.isSlaveBroker()) {
LOG.error("Slave has exception: " + e.getMessage()+" shutting down master now.", e); LOG.error("Slave has exception: " + e.getMessage() + " shutting down master now.", e);
try { try {
doStop(); doStop();
bService.stop(); bService.stop();
}catch(Exception ex){ } catch (Exception ex) {
LOG.warn("Failed to stop the master",ex); LOG.warn("Failed to stop the master", ex);
} }
} }
} }
@ -258,21 +251,17 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
* error transmitted to the client before stopping it's transport. * error transmitted to the client before stopping it's transport.
*/ */
public void serviceException(Throwable e) { public void serviceException(Throwable e) {
// are we a transport exception such as not being able to dispatch // are we a transport exception such as not being able to dispatch
// synchronously to a transport // synchronously to a transport
if (e instanceof IOException) { if (e instanceof IOException) {
serviceTransportException((IOException)e); serviceTransportException((IOException) e);
} else if (e.getClass() == BrokerStoppedException.class) { } else if (e.getClass() == BrokerStoppedException.class) {
// Handle the case where the broker is stopped // Handle the case where the broker is stopped
// But the client is still connected. // But the client is still connected.
if (!stopping.get()) { if (!stopping.get()) {
if (SERVICELOG.isDebugEnabled()) { if (SERVICELOG.isDebugEnabled()) {
SERVICELOG SERVICELOG.debug("Broker has been stopped. Notifying client and closing his connection.");
.debug("Broker has been stopped. Notifying client and closing his connection.");
} }
ConnectionError ce = new ConnectionError(); ConnectionError ce = new ConnectionError();
ce.setException(e); ce.setException(e);
dispatchSync(ce); dispatchSync(ce);
@ -308,8 +297,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
response = command.visit(this); response = command.visit(this);
} catch (Throwable e) { } catch (Throwable e) {
if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) { if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
SERVICELOG.debug("Error occured while processing " SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
+ (responseRequired ? "sync": "async")
+ " command: " + command + ", exception: " + e, e); + " command: " + command + ", exception: " + e, e);
} }
if (responseRequired) { if (responseRequired) {
@ -466,8 +454,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull); return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
} }
public Response processMessageDispatchNotification(MessageDispatchNotification notification) public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
throws Exception {
broker.processDispatchNotification(notification); broker.processDispatchNotification(notification);
return null; return null;
} }
@ -496,8 +483,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
TransportConnectionState cs = lookupConnectionState(connectionId); TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId); SessionState ss = cs.getSessionState(sessionId);
if (ss == null) { if (ss == null) {
throw new IllegalStateException( throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
"Cannot add a producer to a session that had not been registered: "
+ sessionId); + sessionId);
} }
// Avoid replaying dup commands // Avoid replaying dup commands
@ -518,8 +504,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
TransportConnectionState cs = lookupConnectionState(connectionId); TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId); SessionState ss = cs.getSessionState(sessionId);
if (ss == null) { if (ss == null) {
throw new IllegalStateException( throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
"Cannot remove a producer from a session that had not been registered: "
+ sessionId); + sessionId);
} }
ProducerState ps = ss.removeProducer(id); ProducerState ps = ss.removeProducer(id);
@ -537,9 +522,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
TransportConnectionState cs = lookupConnectionState(connectionId); TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId); SessionState ss = cs.getSessionState(sessionId);
if (ss == null) { if (ss == null) {
throw new IllegalStateException( throw new IllegalStateException(broker.getBrokerName()
broker.getBrokerName() + " Cannot add a consumer to a session that had not been registered: " + " Cannot add a consumer to a session that had not been registered: " + sessionId);
+ sessionId);
} }
// Avoid replaying dup commands // Avoid replaying dup commands
if (!ss.getConsumerIds().contains(info.getConsumerId())) { if (!ss.getConsumerIds().contains(info.getConsumerId())) {
@ -559,8 +543,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
TransportConnectionState cs = lookupConnectionState(connectionId); TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId); SessionState ss = cs.getSessionState(sessionId);
if (ss == null) { if (ss == null) {
throw new IllegalStateException( throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
"Cannot remove a consumer from a session that had not been registered: "
+ sessionId); + sessionId);
} }
ConsumerState consumerState = ss.removeConsumer(id); ConsumerState consumerState = ss.removeConsumer(id);
@ -602,7 +585,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
session.shutdown(); session.shutdown();
// Cascade the connection stop to the consumers and producers. // Cascade the connection stop to the consumers and producers.
for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext();) { for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext();) {
ConsumerId consumerId = (ConsumerId)iter.next(); ConsumerId consumerId = (ConsumerId) iter.next();
try { try {
processRemoveConsumer(consumerId, lastDeliveredSequenceId); processRemoveConsumer(consumerId, lastDeliveredSequenceId);
} catch (Throwable e) { } catch (Throwable e) {
@ -610,7 +593,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
} }
for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext();) { for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext();) {
ProducerId producerId = (ProducerId)iter.next(); ProducerId producerId = (ProducerId) iter.next();
try { try {
processRemoveProducer(producerId); processRemoveProducer(producerId);
} catch (Throwable e) { } catch (Throwable e) {
@ -623,29 +606,29 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
public Response processAddConnection(ConnectionInfo info) throws Exception { public Response processAddConnection(ConnectionInfo info) throws Exception {
//if the broker service has slave attached, wait for the slave to be attached to allow client connection. slave connection is fine // if the broker service has slave attached, wait for the slave to be
if(!info.isBrokerMasterConnector()&&connector.getBrokerService().isWaitForSlave()&&connector.getBrokerService().getSlaveStartSignal().getCount()==1){ // attached to allow client connection. slave connection is fine
if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave()
&& connector.getBrokerService().getSlaveStartSignal().getCount() == 1) {
ServiceSupport.dispose(transport); ServiceSupport.dispose(transport);
return new ExceptionResponse(new Exception("Master's slave not attached yet.")); return new ExceptionResponse(new Exception("Master's slave not attached yet."));
} }
// Older clients should have been defaulting this field to true.. but they were not. // Older clients should have been defaulting this field to true.. but
if( wireFormatInfo!=null && wireFormatInfo.getVersion() <= 2 ) { // they were not.
if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
info.setClientMaster(true); info.setClientMaster(true);
} }
TransportConnectionState state; TransportConnectionState state;
// Make sure 2 concurrent connections by the same ID only generate 1 // Make sure 2 concurrent connections by the same ID only generate 1
// TransportConnectionState object. // TransportConnectionState object.
synchronized (brokerConnectionStates) { synchronized (brokerConnectionStates) {
state = (TransportConnectionState)brokerConnectionStates.get(info.getConnectionId()); state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
if (state == null) { if (state == null) {
state = new TransportConnectionState(info, this); state = new TransportConnectionState(info, this);
brokerConnectionStates.put(info.getConnectionId(), state); brokerConnectionStates.put(info.getConnectionId(), state);
} }
state.incrementReference(); state.incrementReference();
} }
// If there are 2 concurrent connections for the same connection id, // If there are 2 concurrent connections for the same connection id,
// then last one in wins, we need to sync here // then last one in wins, we need to sync here
// to figure out the winner. // to figure out the winner.
@ -659,9 +642,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
state.reset(info); state.reset(info);
} }
} }
registerConnectionState(info.getConnectionId(), state); registerConnectionState(info.getConnectionId(), state);
LOG.debug("Setting up new connection: " + getRemoteAddress()); LOG.debug("Setting up new connection: " + getRemoteAddress());
// Setup the context. // Setup the context.
String clientId = info.getClientId(); String clientId = info.getClientId();
@ -681,12 +662,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
this.manageable = info.isManageable(); this.manageable = info.isManageable();
state.setContext(context); state.setContext(context);
state.setConnection(this); state.setConnection(this);
try { try {
broker.addConnection(context, info); broker.addConnection(context, info);
}catch(Exception e){ } catch (Exception e) {
brokerConnectionStates.remove(info); brokerConnectionStates.remove(info);
LOG.warn("Failed to add Connection",e); LOG.warn("Failed to add Connection", e);
throw e; throw e;
} }
if (info.isManageable() && broker.isFaultTolerantConfiguration()) { if (info.isManageable() && broker.isFaultTolerantConfiguration()) {
@ -698,16 +678,17 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null; return null;
} }
public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws InterruptedException { public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
throws InterruptedException {
TransportConnectionState cs = lookupConnectionState(id); TransportConnectionState cs = lookupConnectionState(id);
if (cs != null) { if (cs != null) {
// Don't allow things to be added to the connection state while we are // Don't allow things to be added to the connection state while we
// are
// shutting down. // shutting down.
cs.shutdown(); cs.shutdown();
// Cascade the connection stop to the sessions. // Cascade the connection stop to the sessions.
for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) { for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
SessionId sessionId = (SessionId)iter.next(); SessionId sessionId = (SessionId) iter.next();
try { try {
processRemoveSession(sessionId, lastDeliveredSequenceId); processRemoveSession(sessionId, lastDeliveredSequenceId);
} catch (Throwable e) { } catch (Throwable e) {
@ -716,7 +697,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
// Cascade the connection stop to temp destinations. // Cascade the connection stop to temp destinations.
for (Iterator iter = cs.getTempDesinations().iterator(); iter.hasNext();) { for (Iterator iter = cs.getTempDesinations().iterator(); iter.hasNext();) {
DestinationInfo di = (DestinationInfo)iter.next(); DestinationInfo di = (DestinationInfo) iter.next();
try { try {
broker.removeDestination(cs.getContext(), di.getDestination(), 0); broker.removeDestination(cs.getContext(), di.getDestination(), 0);
} catch (Throwable e) { } catch (Throwable e) {
@ -729,7 +710,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} catch (Throwable e) { } catch (Throwable e) {
SERVICELOG.warn("Failed to remove connection " + cs.getInfo(), e); SERVICELOG.warn("Failed to remove connection " + cs.getInfo(), e);
} }
TransportConnectionState state = unregisterConnectionState(id); TransportConnectionState state = unregisterConnectionState(id);
if (state != null) { if (state != null) {
synchronized (brokerConnectionStates) { synchronized (brokerConnectionStates) {
@ -754,7 +734,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
public void dispatchSync(Command message) { public void dispatchSync(Command message) {
//getStatistics().getEnqueues().increment(); // getStatistics().getEnqueues().increment();
try { try {
processDispatch(message); processDispatch(message);
} catch (IOException e) { } catch (IOException e) {
@ -764,11 +744,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
public void dispatchAsync(Command message) { public void dispatchAsync(Command message) {
if (!stopping.get()) { if (!stopping.get()) {
//getStatistics().getEnqueues().increment(); // getStatistics().getEnqueues().increment();
if (taskRunner == null) { if (taskRunner == null) {
dispatchSync(message); dispatchSync(message);
} else { } else {
synchronized(dispatchQueue) { synchronized (dispatchQueue) {
dispatchQueue.add(message); dispatchQueue.add(message);
} }
try { try {
@ -779,7 +759,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
} else { } else {
if (message.isMessageDispatch()) { if (message.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch)message; MessageDispatch md = (MessageDispatch) message;
Runnable sub = md.getTransmitCallback(); Runnable sub = md.getTransmitCallback();
broker.postProcessDispatch(md); broker.postProcessDispatch(md);
if (sub != null) { if (sub != null) {
@ -790,8 +770,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
protected void processDispatch(Command command) throws IOException { protected void processDispatch(Command command) throws IOException {
final MessageDispatch messageDispatch = (MessageDispatch)(command.isMessageDispatch() final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
? command : null);
try { try {
if (!stopping.get()) { if (!stopping.get()) {
if (messageDispatch != null) { if (messageDispatch != null) {
@ -807,7 +786,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
sub.run(); sub.run();
} }
} }
//getStatistics().getDequeues().increment(); // getStatistics().getDequeues().increment();
} }
} }
@ -825,10 +804,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
return false; return false;
} }
if (!dispatchStopped.get()) { if (!dispatchStopped.get()) {
Command command = null; Command command = null;
synchronized(dispatchQueue) { synchronized (dispatchQueue) {
if (dispatchQueue.isEmpty()) { if (dispatchQueue.isEmpty()) {
return false; return false;
} }
@ -838,7 +816,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return true; return true;
} }
return false; return false;
} catch (IOException e) { } catch (IOException e) {
if (dispatchStopped.compareAndSet(false, true)) { if (dispatchStopped.compareAndSet(false, true)) {
dispatchStoppedLatch.countDown(); dispatchStoppedLatch.countDown();
@ -870,7 +847,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
public void start() throws Exception { public void start() throws Exception {
starting = true; starting = true;
try { try {
synchronized(this) { synchronized (this) {
if (taskRunnerFactory != null) { if (taskRunnerFactory != null) {
taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
+ getRemoteAddress()); + getRemoteAddress());
@ -878,7 +855,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
taskRunner = null; taskRunner = null;
} }
transport.start(); transport.start();
active = true; active = true;
dispatchAsync(connector.getBrokerInfo()); dispatchAsync(connector.getBrokerInfo());
connector.onStarted(this); connector.onStarted(this);
@ -898,6 +874,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
} }
} }
public void stop() throws Exception { public void stop() throws Exception {
synchronized (this) { synchronized (this) {
pendingStop = true; pendingStop = true;
@ -907,8 +884,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
} }
stopAsync(); stopAsync();
while( !stopped.await(5, TimeUnit.SECONDS) ) { while (!stopped.await(5, TimeUnit.SECONDS)) {
LOG.info("The connection to '" + transport.getRemoteAddress()+ "' is taking a long time to shutdown."); LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown.");
} }
} }
@ -916,22 +893,21 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
// If we're in the middle of starting // If we're in the middle of starting
// then go no further... for now. // then go no further... for now.
if (stopping.compareAndSet(false, true)) { if (stopping.compareAndSet(false, true)) {
// Let all the connection contexts know we are shutting down // Let all the connection contexts know we are shutting down
// so that in progress operations can notice and unblock. // so that in progress operations can notice and unblock.
List<TransportConnectionState> connectionStates = listConnectionStates(); List<TransportConnectionState> connectionStates = listConnectionStates();
for (TransportConnectionState cs : connectionStates) { for (TransportConnectionState cs : connectionStates) {
cs.getContext().getStopping().set(true); cs.getContext().getStopping().set(true);
} }
new Thread("ActiveMQ Transport Stopper: " + transport.getRemoteAddress()) {
new Thread("ActiveMQ Transport Stopper: "+ transport.getRemoteAddress()) {
@Override @Override
public void run() { public void run() {
serviceLock.writeLock().lock(); serviceLock.writeLock().lock();
try { try {
doStop(); doStop();
} catch (Throwable e) { } catch (Throwable e) {
LOG.debug("Error occured while shutting down a connection to '" + transport.getRemoteAddress()+ "': ", e); LOG.debug("Error occured while shutting down a connection to '" + transport.getRemoteAddress()
+ "': ", e);
} finally { } finally {
stopped.countDown(); stopped.countDown();
serviceLock.writeLock().unlock(); serviceLock.writeLock().unlock();
@ -943,7 +919,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
@Override @Override
public String toString() { public String toString() {
return "Transport Connection to: "+transport.getRemoteAddress(); return "Transport Connection to: " + transport.getRemoteAddress();
} }
protected void doStop() throws Exception, InterruptedException { protected void doStop() throws Exception, InterruptedException {
@ -958,31 +934,26 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
duplexBridge.stop(); duplexBridge.stop();
} }
} }
} catch (Exception ignore) { } catch (Exception ignore) {
LOG.trace("Exception caught stopping", ignore); LOG.trace("Exception caught stopping", ignore);
} }
try { try {
transport.stop(); transport.stop();
LOG.debug("Stopped transport: " + transport.getRemoteAddress()); LOG.debug("Stopped transport: " + transport.getRemoteAddress());
} catch (Exception e) { } catch (Exception e) {
LOG.debug("Could not stop transport: " + e, e); LOG.debug("Could not stop transport: " + e, e);
} }
if (taskRunner != null) { if (taskRunner != null) {
taskRunner.shutdown(1); taskRunner.shutdown(1);
} }
active = false; active = false;
// Run the MessageDispatch callbacks so that message references get // Run the MessageDispatch callbacks so that message references get
// cleaned up. // cleaned up.
synchronized(dispatchQueue) { synchronized (dispatchQueue) {
for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) { for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) {
Command command = iter.next(); Command command = iter.next();
if (command.isMessageDispatch()) { if (command.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch)command; MessageDispatch md = (MessageDispatch) command;
Runnable sub = md.getTransmitCallback(); Runnable sub = md.getTransmitCallback();
broker.postProcessDispatch(md); broker.postProcessDispatch(md);
if (sub != null) { if (sub != null) {
@ -995,9 +966,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
// //
// Remove all logical connection associated with this connection // Remove all logical connection associated with this connection
// from the broker. // from the broker.
if (!broker.isStopped()) { if (!broker.isStopped()) {
List<TransportConnectionState> connectionStates = listConnectionStates(); List<TransportConnectionState> connectionStates = listConnectionStates();
connectionStates = listConnectionStates(); connectionStates = listConnectionStates();
for (TransportConnectionState cs : connectionStates) { for (TransportConnectionState cs : connectionStates) {
@ -1009,7 +978,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
ignore.printStackTrace(); ignore.printStackTrace();
} }
} }
if (brokerInfo != null) { if (brokerInfo != null) {
broker.removeBroker(this, brokerInfo); broker.removeBroker(this, brokerInfo);
} }
@ -1025,7 +993,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
/** /**
* @param blockedCandidate The blockedCandidate to set. * @param blockedCandidate
* The blockedCandidate to set.
*/ */
public void setBlockedCandidate(boolean blockedCandidate) { public void setBlockedCandidate(boolean blockedCandidate) {
this.blockedCandidate = blockedCandidate; this.blockedCandidate = blockedCandidate;
@ -1039,7 +1008,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
/** /**
* @param markedCandidate The markedCandidate to set. * @param markedCandidate
* The markedCandidate to set.
*/ */
public void setMarkedCandidate(boolean markedCandidate) { public void setMarkedCandidate(boolean markedCandidate) {
this.markedCandidate = markedCandidate; this.markedCandidate = markedCandidate;
@ -1050,7 +1020,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
/** /**
* @param slow The slow to set. * @param slow
* The slow to set.
*/ */
public void setSlow(boolean slow) { public void setSlow(boolean slow) {
this.slow = slow; this.slow = slow;
@ -1094,14 +1065,16 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
/** /**
* @param blocked The blocked to set. * @param blocked
* The blocked to set.
*/ */
public void setBlocked(boolean blocked) { public void setBlocked(boolean blocked) {
this.blocked = blocked; this.blocked = blocked;
} }
/** /**
* @param connected The connected to set. * @param connected
* The connected to set.
*/ */
public void setConnected(boolean connected) { public void setConnected(boolean connected) {
this.connected = connected; this.connected = connected;
@ -1115,7 +1088,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
/** /**
* @param active The active to set. * @param active
* The active to set.
*/ */
public void setActive(boolean active) { public void setActive(boolean active) {
this.active = active; this.active = active;
@ -1149,15 +1123,20 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
public Response processBrokerInfo(BrokerInfo info) { public Response processBrokerInfo(BrokerInfo info) {
if (info.isSlaveBroker()) { if (info.isSlaveBroker()) {
BrokerService bService = connector.getBrokerService();
// Do we only support passive slaves - or does the slave want to be
// passive ?
boolean passive = bService.isPassiveSlave() || info.isPassiveSlave();
if (passive == false) {
// stream messages from this broker (the master) to // stream messages from this broker (the master) to
// the slave // the slave
MutableBrokerFilter parent = (MutableBrokerFilter)broker.getAdaptor(MutableBrokerFilter.class); MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
masterBroker = new MasterBroker(parent, transport); masterBroker = new MasterBroker(parent, transport);
masterBroker.startProcessing(); masterBroker.startProcessing();
LOG.info("Slave Broker " + info.getBrokerName() + " is attached"); }
BrokerService bService=connector.getBrokerService(); LOG.info((passive?"Passive":"Active")+" Slave Broker " + info.getBrokerName() + " is attached");
bService.slaveConnectionEstablished(); bService.slaveConnectionEstablished();
} else if (info.isNetworkConnection() && info.isDuplexConnection()) { } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
// so this TransportConnection is the rear end of a network bridge // so this TransportConnection is the rear end of a network bridge
// We have been requested to create a two way pipe ... // We have been requested to create a two way pipe ...
@ -1174,14 +1153,12 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
Transport localTransport = TransportFactory.connect(uri); Transport localTransport = TransportFactory.connect(uri);
Transport remoteBridgeTransport = new ResponseCorrelator(transport); Transport remoteBridgeTransport = new ResponseCorrelator(transport);
duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport);
remoteBridgeTransport);
duplexBridge.setBrokerService(broker.getBrokerService()); duplexBridge.setBrokerService(broker.getBrokerService());
// now turn duplex off this side // now turn duplex off this side
info.setDuplexConnection(false); info.setDuplexConnection(false);
duplexBridge.setCreatedByDuplex(true); duplexBridge.setCreatedByDuplex(true);
duplexBridge.duplexStart(this,brokerInfo, info); duplexBridge.duplexStart(this, brokerInfo, info);
LOG.info("Created Duplex Bridge back to " + info.getBrokerName()); LOG.info("Created Duplex Bridge back to " + info.getBrokerName());
return null; return null;
} catch (Exception e) { } catch (Exception e) {
@ -1195,12 +1172,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
this.brokerInfo = info; this.brokerInfo = info;
broker.addBroker(this, info); broker.addBroker(this, info);
networkConnection = true; networkConnection = true;
List<TransportConnectionState> connectionStates = listConnectionStates(); List<TransportConnectionState> connectionStates = listConnectionStates();
for (TransportConnectionState cs : connectionStates) { for (TransportConnectionState cs : connectionStates) {
cs.getContext().setNetworkConnection(true); cs.getContext().setNetworkConnection(true);
} }
return null; return null;
} }
@ -1247,8 +1222,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
ProducerState producerState = ss.getProducerState(id); ProducerState producerState = ss.getProducerState(id);
if (producerState != null && producerState.getInfo() != null) { if (producerState != null && producerState.getInfo() != null) {
ProducerInfo info = producerState.getInfo(); ProducerInfo info = producerState.getInfo();
result.setMutable(info.getDestination() == null result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
|| info.getDestination().isComposite());
} }
} }
producerExchanges.put(id, result); producerExchanges.put(id, result);
@ -1314,8 +1288,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
public Response processConnectionControl(ConnectionControl control) throws Exception { public Response processConnectionControl(ConnectionControl control) throws Exception {
if(control != null) { if (control != null) {
faultTolerantConnection=control.isFaultTolerant(); faultTolerantConnection = control.isFaultTolerant();
} }
return null; return null;
} }
@ -1328,15 +1302,16 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null; return null;
} }
protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,TransportConnectionState state) { protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
TransportConnectionState state) {
TransportConnectionState cs = null; TransportConnectionState cs = null;
if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()){ if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
//swap implementations // swap implementations
TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister(); TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
newRegister.intialize(connectionStateRegister); newRegister.intialize(connectionStateRegister);
connectionStateRegister = newRegister; connectionStateRegister = newRegister;
} }
cs= connectionStateRegister.registerConnectionState(connectionId, state); cs = connectionStateRegister.registerConnectionState(connectionId, state);
return cs; return cs;
} }
@ -1367,5 +1342,4 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) { protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
return connectionStateRegister.lookupConnectionState(connectionId); return connectionStateRegister.lookupConnectionState(connectionId);
} }
} }

View File

@ -139,7 +139,7 @@ public class MasterConnector implements Service, BrokerServiceAware {
} }
public void onException(IOException error) { public void onException(IOException error) {
if (started.get()) { if (started.get() && remoteBroker.isDisposed()) {
serviceRemoteException(error); serviceRemoteException(error);
} }
} }
@ -206,6 +206,7 @@ public class MasterConnector implements Service, BrokerServiceAware {
brokerInfo.setBrokerName(broker.getBrokerName()); brokerInfo.setBrokerName(broker.getBrokerName());
brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos()); brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos());
brokerInfo.setSlaveBroker(true); brokerInfo.setSlaveBroker(true);
brokerInfo.setPassiveSlave(broker.isPassiveSlave());
restartBridge(); restartBridge();
LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been established."); LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been established.");
} }

View File

@ -16,7 +16,13 @@
*/ */
package org.apache.activemq.command; package org.apache.activemq.command;
import org.apache.activemq.plugin.StatisticsBrokerPlugin;
import org.apache.activemq.state.CommandVisitor; import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.Properties;
/** /**
* When a client connects to a broker, the broker send the client a BrokerInfo * When a client connects to a broker, the broker send the client a BrokerInfo
@ -28,6 +34,8 @@ import org.apache.activemq.state.CommandVisitor;
* @version $Revision: 1.7 $ * @version $Revision: 1.7 $
*/ */
public class BrokerInfo extends BaseCommand { public class BrokerInfo extends BaseCommand {
private static Log LOG = LogFactory.getLog(BrokerInfo.class);
private static final String PASSIVE_SLAVE_KEY = "passiveSlave";
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.BROKER_INFO; public static final byte DATA_STRUCTURE_TYPE = CommandTypes.BROKER_INFO;
BrokerId brokerId; BrokerId brokerId;
String brokerURL; String brokerURL;
@ -209,4 +217,33 @@ public class BrokerInfo extends BaseCommand {
public void setNetworkProperties(String networkProperties) { public void setNetworkProperties(String networkProperties) {
this.networkProperties = networkProperties; this.networkProperties = networkProperties;
} }
public boolean isPassiveSlave() {
boolean result = false;
Properties props = getProperties();
if (props != null) {
result = Boolean.parseBoolean(props.getProperty(PASSIVE_SLAVE_KEY, "false"));
}
return result;
}
public void setPassiveSlave(boolean value) {
Properties props = new Properties();
props.put(PASSIVE_SLAVE_KEY, Boolean.toString(value));
try {
this.networkProperties=MarshallingSupport.propertiesToString(props);
} catch (IOException e) {
LOG.error("Failed to marshall props to a String",e);
}
}
public Properties getProperties() {
Properties result = null;
try {
result = MarshallingSupport.stringToProperties(getNetworkProperties());
} catch (IOException e) {
LOG.error("Failed to marshall properties", e);
}
return result;
}
} }

View File

@ -25,10 +25,8 @@ import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.command.ConnectionControl;