git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@691206 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-09-02 11:48:21 +00:00
parent 2624decb4e
commit d14e4aee1c
6 changed files with 71 additions and 8 deletions

View File

@ -102,7 +102,7 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision: 1.1 $ * @version $Revision: 1.1 $
*/ */
public class BrokerService implements Service { public class BrokerService implements Service {
protected CountDownLatch slaveStartSignal = new CountDownLatch(1);
public static final String DEFAULT_PORT = "61616"; public static final String DEFAULT_PORT = "61616";
public static final String LOCAL_HOST_NAME; public static final String LOCAL_HOST_NAME;
public static final String DEFAULT_BROKER_NAME = "localhost"; public static final String DEFAULT_BROKER_NAME = "localhost";
@ -117,6 +117,8 @@ public class BrokerService implements Service {
private boolean useShutdownHook = true; private boolean useShutdownHook = true;
private boolean useLoggingForShutdownErrors; private boolean useLoggingForShutdownErrors;
private boolean shutdownOnMasterFailure; private boolean shutdownOnMasterFailure;
private boolean shutdownOnSlaveFailure;
private boolean waitForSlave;
private String brokerName = DEFAULT_BROKER_NAME; private String brokerName = DEFAULT_BROKER_NAME;
private File dataDirectoryFile; private File dataDirectoryFile;
private File tmpDataDirectory; private File tmpDataDirectory;
@ -1820,6 +1822,19 @@ public class BrokerService implements Service {
return context; return context;
} }
protected void waitForSlave(){
try {
slaveStartSignal.await();
}catch(InterruptedException e){
LOG.error("Exception waiting for slave:"+e);
}
}
protected void slaveConnectionEstablished(){
slaveStartSignal.countDown();
}
/** /**
* Start all transport and network connections, proxies and bridges * Start all transport and network connections, proxies and bridges
* *
@ -1847,7 +1862,9 @@ public class BrokerService implements Service {
map.put("network", "true"); map.put("network", "true");
map.put("async", "false"); map.put("async", "false");
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
if(isWaitForSlave()){
waitForSlave();
}
for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
NetworkConnector connector = iter.next(); NetworkConnector connector = iter.next();
connector.setLocalUri(uri); connector.setLocalUri(uri);
@ -1984,4 +2001,24 @@ public class BrokerService implements Service {
this.sslContext = sslContext; this.sslContext = sslContext;
} }
public boolean isShutdownOnSlaveFailure() {
return shutdownOnSlaveFailure;
}
public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
}
public boolean isWaitForSlave() {
return waitForSlave;
}
public void setWaitForSlave(boolean waitForSlave) {
this.waitForSlave = waitForSlave;
}
public CountDownLatch getSlaveStartSignal() {
return slaveStartSignal;
}
} }

View File

@ -87,6 +87,7 @@ import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport; import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.URISupport; import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -208,6 +209,20 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
public void serviceTransportException(IOException e) { public void serviceTransportException(IOException e) {
BrokerService bService=connector.getBrokerService();
if(bService.isShutdownOnSlaveFailure()){
if(brokerInfo!=null){
if(brokerInfo.isSlaveBroker()){
LOG.error("Slave has exception: " + e.getMessage()+" shutting down master now.", e);
try {
broker.stop();
bService.stop();
}catch(Exception ex){
LOG.warn("Failed to stop the master",ex);
}
}
}
}
if (!stopping.get()) { if (!stopping.get()) {
transportException.set(e); transportException.set(e);
if (TRANSPORTLOG.isDebugEnabled()) { if (TRANSPORTLOG.isDebugEnabled()) {
@ -601,7 +616,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
public Response processAddConnection(ConnectionInfo info) throws Exception { public Response processAddConnection(ConnectionInfo info) throws Exception {
//if the broker service has slave attached, wait for the slave to be attached to allow client connection. slave connection is fine
if(!info.isBrokerMasterConnector()&&connector.getBrokerService().isWaitForSlave()&&connector.getBrokerService().getSlaveStartSignal().getCount()==1){
ServiceSupport.dispose(transport);
return new ExceptionResponse(new Exception("Master's slave not attached yet."));
}
// Older clients should have been defaulting this field to true.. but they were not. // Older clients should have been defaulting this field to true.. but they were not.
if( wireFormatInfo!=null && wireFormatInfo.getVersion() <= 2 ) { if( wireFormatInfo!=null && wireFormatInfo.getVersion() <= 2 ) {
info.setClientMaster(true); info.setClientMaster(true);
@ -1129,6 +1148,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
masterBroker = new MasterBroker(parent, transport); masterBroker = new MasterBroker(parent, transport);
masterBroker.startProcessing(); masterBroker.startProcessing();
LOG.info("Slave Broker " + info.getBrokerName() + " is attached"); LOG.info("Slave Broker " + info.getBrokerName() + " is attached");
BrokerService bService=connector.getBrokerService();
bService.slaveConnectionEstablished();
} else if (info.isNetworkConnection() && info.isDuplexConnection()) { } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
// so this TransportConnection is the rear end of a network bridge // so this TransportConnection is the rear end of a network bridge
// We have been requested to create a two way pipe ... // We have been requested to create a two way pipe ...

View File

@ -394,4 +394,8 @@ public class TransportConnector implements Connector, BrokerServiceAware {
public Broker getBroker() { public Broker getBroker() {
return broker; return broker;
} }
public BrokerService getBrokerService() {
return brokerService;
}
} }

View File

@ -139,7 +139,7 @@ public class MasterConnector implements Service, BrokerServiceAware {
} catch (Exception e) { } catch (Exception e) {
masterActive.set(false); masterActive.set(false);
LOG.error("Failed to start network bridge: " + e, e); LOG.error("Failed to start network bridge: " + e, e);
} }
} }
protected void startBridge() throws Exception { protected void startBridge() throws Exception {
@ -148,10 +148,8 @@ public class MasterConnector implements Service, BrokerServiceAware {
connectionInfo.setClientId(idGenerator.generateId()); connectionInfo.setClientId(idGenerator.generateId());
connectionInfo.setUserName(userName); connectionInfo.setUserName(userName);
connectionInfo.setPassword(password); connectionInfo.setPassword(password);
connectionInfo.setBrokerMasterConnector(true);
localBroker.oneway(connectionInfo); localBroker.oneway(connectionInfo);
ConnectionInfo remoteInfo = new ConnectionInfo();
connectionInfo.copy(remoteInfo);
remoteInfo.setBrokerMasterConnector(true);
remoteBroker.oneway(connectionInfo); remoteBroker.oneway(connectionInfo);
sessionInfo = new SessionInfo(connectionInfo, 1); sessionInfo = new SessionInfo(connectionInfo, 1);
localBroker.oneway(sessionInfo); localBroker.oneway(sessionInfo);

View File

@ -392,7 +392,7 @@ public abstract class AbstractRegion implements Region {
try { try {
context.getBroker().addDestination(context, destination); context.getBroker().addDestination(context, destination);
// dest = addDestination(context, destination); dest = addDestination(context, destination);
} catch (DestinationAlreadyExistsException e) { } catch (DestinationAlreadyExistsException e) {
// if the destination already exists then lets ignore // if the destination already exists then lets ignore
// this error // this error

View File

@ -813,6 +813,8 @@ consume a given message
<xs:attribute name='producerSystemUsagePortion' type='xs:integer'/> <xs:attribute name='producerSystemUsagePortion' type='xs:integer'/>
<xs:attribute name='regionBroker' type='xs:string'/> <xs:attribute name='regionBroker' type='xs:string'/>
<xs:attribute name='shutdownOnMasterFailure' type='xs:boolean'/> <xs:attribute name='shutdownOnMasterFailure' type='xs:boolean'/>
<xs:attribute name='shutdownOnSlaveFailure' type='xs:boolean'/>
<xs:attribute name='waitForSlave' type='xs:boolean'/>
<xs:attribute name='splitSystemUsageForProducersConsumers' type='xs:boolean'/> <xs:attribute name='splitSystemUsageForProducersConsumers' type='xs:boolean'/>
<xs:attribute name='sslContext' type='xs:string'/> <xs:attribute name='sslContext' type='xs:string'/>
<xs:attribute name='start' type='xs:boolean'> <xs:attribute name='start' type='xs:boolean'>