Robert Davies 2009-09-10 13:19:52 +00:00
parent aead3e0fc0
commit 98497b1930
5 changed files with 199 additions and 37 deletions

View File

@ -31,7 +31,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionMetaData;
@ -386,7 +385,8 @@ public class BrokerService implements Service {
* @return true if this Broker is a slave to a Master
*/
public boolean isSlave() {
return masterConnector != null && masterConnector.isSlave();
return (masterConnector != null && masterConnector.isSlave()) ||
(masterConnector != null && masterConnector.isStoppedBeforeStart());
}
public void masterFailed() {
@ -420,7 +420,7 @@ public class BrokerService implements Service {
// Service interface
// -------------------------------------------------------------------------
public void start() throws Exception {
if (!started.compareAndSet(false, true)) {
if (stopped.get() || !started.compareAndSet(false, true)) {
// lets just ignore redundant start() calls
// as its way too easy to not be completely sure if start() has been
// called or not with the gazillion of different configuration
@ -467,8 +467,10 @@ public class BrokerService implements Service {
if (!isSlave()) {
startAllConnectors();
}
if (isUseJmx() && masterConnector != null) {
registerFTConnectorMBean(masterConnector);
if (!stopped.get()) {
if (isUseJmx() && masterConnector != null) {
registerFTConnectorMBean(masterConnector);
}
}
brokerId = broker.getBrokerId();
LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
@ -477,7 +479,9 @@ public class BrokerService implements Service {
} catch (Exception e) {
LOG.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e);
try {
stop();
if (!stopped.get()) {
stop();
}
} catch (Exception ex) {
LOG.warn("Failed to stop broker after failure in start ", ex);
}
@ -517,6 +521,24 @@ public class BrokerService implements Service {
SelectorParser.clearCache();
stopped.set(true);
stoppedLatch.countDown();
if (masterConnectorURI == null) {
// master start has not finished yet
if (slaveStartSignal.getCount() == 1) {
started.set(false);
slaveStartSignal.countDown();
}
} else {
for (Service service : services) {
if (service instanceof MasterConnector) {
MasterConnector mConnector = (MasterConnector) service;
if (!mConnector.isSlave()) {
// means should be slave but not connected to master yet
started.set(false);
mConnector.stopBeforeConnected();
}
}
}
}
LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") stopped");
synchronized (shutdownHooks) {
for (Runnable hook : shutdownHooks) {
@ -529,6 +551,77 @@ public class BrokerService implements Service {
}
stopper.throwFirstException();
}
public boolean checkQueueSize(String queueName) {
long count = 0;
long queueSize = 0;
Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
if (entry.getKey().isQueue()) {
if (entry.getValue().getName().matches(queueName)) {
queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
count = queueSize;
if (queueSize > 0) {
LOG.info("Queue has pending message:" + entry.getValue().getName() + " queueSize is:"
+ queueSize);
}
}
}
}
return count == 0;
}
/**
* This method (both connectorName and queueName are using regex to match)
* 1. stop the connector (supposed the user input the connector which the
* clients connect to) 2. to check whether there is any pending message on
* the queues defined by queueName 3. supposedly, after stop the connector,
* client should failover to other broker and pending messages should be
* forwarded. if no pending messages, the method finally call stop to stop
* the broker.
*
* @param connectorName
* @param queueName
* @param timeout
* @param pollInterval
* @throws Exception
*/
public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
throws Exception {
if (isUseJmx()) {
if (connectorName == null || queueName == null || timeout <= 0) {
throw new Exception(
"connectorName and queueName cannot be null and timeout should be >0 for stopGracefully.");
}
if (pollInterval <= 0) {
pollInterval = 30;
}
LOG.info("Stop gracefully with connectorName:" + connectorName + " queueName:" + queueName + " timeout:"
+ timeout + " pollInterval:" + pollInterval);
TransportConnector connector;
for (int i = 0; i < transportConnectors.size(); i++) {
connector = transportConnectors.get(i);
if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) {
connector.stop();
}
}
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeout * 1000) {
// check quesize until it gets zero
if (checkQueueSize(queueName)) {
stop();
break;
} else {
Thread.sleep(pollInterval * 1000);
}
}
if (stopped.get()) {
LOG.info("Successfully stop the broker.");
} else {
LOG.info("There is still pending message on the queue. Please check and stop the broker manually.");
}
}
}
/**
* A helper method to block the caller thread until the broker has been
@ -1828,24 +1921,26 @@ public class BrokerService implements Service {
if (isWaitForSlave()) {
waitForSlave();
}
for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
NetworkConnector connector = iter.next();
connector.setLocalUri(uri);
connector.setBrokerName(getBrokerName());
connector.setDurableDestinations(durableDestinations);
connector.start();
}
for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
ProxyConnector connector = iter.next();
connector.start();
}
for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
JmsConnector connector = iter.next();
connector.start();
}
for (Service service : services) {
configureService(service);
service.start();
if (!stopped.get()) {
for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
NetworkConnector connector = iter.next();
connector.setLocalUri(uri);
connector.setBrokerName(getBrokerName());
connector.setDurableDestinations(durableDestinations);
connector.start();
}
for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
ProxyConnector connector = iter.next();
connector.start();
}
for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
JmsConnector connector = iter.next();
connector.start();
}
for (Service service : services) {
configureService(service);
service.start();
}
}
}
}

View File

@ -218,7 +218,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
if(brokerInfo.isSlaveBroker()){
LOG.error("Slave has exception: " + e.getMessage()+" shutting down master now.", e);
try {
broker.stop();
doStop();
bService.stop();
}catch(Exception ex){
LOG.warn("Failed to stop the master",ex);

View File

@ -38,6 +38,7 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.ServiceStopper;
@ -63,6 +64,7 @@ public class MasterConnector implements Service, BrokerServiceAware {
private Transport remoteBroker;
private TransportConnector connector;
private AtomicBoolean started = new AtomicBoolean(false);
private AtomicBoolean stoppedBeforeStart = new AtomicBoolean(false);
private final IdGenerator idGenerator = new IdGenerator();
private String userName;
private String password;
@ -70,6 +72,8 @@ public class MasterConnector implements Service, BrokerServiceAware {
private SessionInfo sessionInfo;
private ProducerInfo producerInfo;
private final AtomicBoolean masterActive = new AtomicBoolean();
private BrokerInfo brokerInfo;
private boolean firstConnection=true;
public MasterConnector() {
}
@ -95,6 +99,15 @@ public class MasterConnector implements Service, BrokerServiceAware {
return masterActive.get();
}
protected void restartBridge() throws Exception {
localBroker.oneway(connectionInfo);
remoteBroker.oneway(connectionInfo);
localBroker.oneway(sessionInfo);
remoteBroker.oneway(sessionInfo);
remoteBroker.oneway(producerInfo);
remoteBroker.oneway(brokerInfo);
}
public void start() throws Exception {
if (!started.compareAndSet(false, true)) {
return;
@ -130,6 +143,35 @@ public class MasterConnector implements Service, BrokerServiceAware {
serviceRemoteException(error);
}
}
public void transportResumed() {
try{
if(!firstConnection){
localBroker = TransportFactory.connect(localURI);
localBroker.setTransportListener(new DefaultTransportListener() {
public void onCommand(Object command) {
}
public void onException(IOException error) {
if (started.get()) {
serviceLocalException(error);
}
}
});
localBroker.start();
restartBridge();
LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been reestablished.");
}else{
firstConnection=false;
}
}catch(IOException e){
LOG.error("MasterConnector failed to send BrokerInfo in transportResumed:", e);
}catch(Exception e){
LOG.error("MasterConnector failed to restart localBroker in transportResumed:", e);
}
}
});
try {
localBroker.start();
@ -138,8 +180,12 @@ public class MasterConnector implements Service, BrokerServiceAware {
masterActive.set(true);
} catch (Exception e) {
masterActive.set(false);
LOG.error("Failed to start network bridge: " + e, e);
}
if(!stoppedBeforeStart.get()){
LOG.error("Failed to start network bridge: " + e, e);
}else{
LOG.info("Slave stopped before connected to the master.");
}
}
}
protected void startBridge() throws Exception {
@ -149,15 +195,9 @@ public class MasterConnector implements Service, BrokerServiceAware {
connectionInfo.setUserName(userName);
connectionInfo.setPassword(password);
connectionInfo.setBrokerMasterConnector(true);
localBroker.oneway(connectionInfo);
remoteBroker.oneway(connectionInfo);
sessionInfo = new SessionInfo(connectionInfo, 1);
localBroker.oneway(sessionInfo);
remoteBroker.oneway(sessionInfo);
producerInfo = new ProducerInfo(sessionInfo, 1);
producerInfo.setResponseRequired(false);
remoteBroker.oneway(producerInfo);
BrokerInfo brokerInfo = null;
if (connector != null) {
brokerInfo = connector.getBrokerInfo();
} else {
@ -166,12 +206,12 @@ public class MasterConnector implements Service, BrokerServiceAware {
brokerInfo.setBrokerName(broker.getBrokerName());
brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos());
brokerInfo.setSlaveBroker(true);
remoteBroker.oneway(brokerInfo);
restartBridge();
LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been established.");
}
public void stop() throws Exception {
if (!started.compareAndSet(true, false)) {
if (!started.compareAndSet(true, false)||!masterActive.get()) {
return;
}
masterActive.set(false);
@ -192,6 +232,15 @@ public class MasterConnector implements Service, BrokerServiceAware {
ss.throwFirstException();
}
}
public void stopBeforeConnected()throws Exception{
masterActive.set(false);
started.set(false);
stoppedBeforeStart.set(true);
ServiceStopper ss = new ServiceStopper();
ss.stop(localBroker);
ss.stop(remoteBroker);
}
protected void serviceRemoteException(IOException error) {
LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
@ -224,8 +273,12 @@ public class MasterConnector implements Service, BrokerServiceAware {
}
protected void serviceLocalException(Throwable error) {
LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
ServiceSupport.dispose(this);
if (!(error instanceof TransportDisposedIOException) || localBroker.isDisposed()){
LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
ServiceSupport.dispose(this);
}else{
LOG.info(error.getMessage());
}
}
/**
@ -289,4 +342,9 @@ public class MasterConnector implements Service, BrokerServiceAware {
broker.masterFailed();
ServiceSupport.dispose(this);
}
public boolean isStoppedBeforeStart() {
return stoppedBeforeStart.get();
}
}

View File

@ -75,7 +75,13 @@ public class BrokerView implements BrokerViewMBean {
public void stop() throws Exception {
brokerService.stop();
}
public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
throws Exception {
brokerService.stopGracefully(connectorName, queueName, timeout, pollInterval);
}
public long getTotalEnqueueCount() {
return broker.getDestinationStatistics().getEnqueues().getCount();
}

View File

@ -114,7 +114,10 @@ public interface BrokerViewMBean extends Service {
/**
* Stop the broker and all it's components.
*/
@MBeanInfo("Stop the broker and all its components.")
void stop() throws Exception;
@MBeanInfo("Poll for queues matching queueName are empty before stopping")
void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception;
@MBeanInfo("Topics (broadcasted 'queues'); generally system information.")
ObjectName[] getTopics();