Merge back the isSlave() JMS API. 

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1449077 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-02-22 15:32:26 +00:00
parent 806ec21eb6
commit d75e418c3d
5 changed files with 94 additions and 3 deletions

View File

@ -232,6 +232,7 @@ public class BrokerService implements Service {
private Throwable startException = null;
private boolean startAsync = false;
private Date startDate;
private boolean slave = true;
static {
String localHostName = "localhost";
@ -706,6 +707,7 @@ public class BrokerService implements Service {
}
}
stopAllConnectors(stopper);
this.slave = true;
// remove any VMTransports connected
// this has to be done after services are stopped,
// to avoid timing issue with discovery (spinning up a new instance)
@ -2354,6 +2356,7 @@ public class BrokerService implements Service {
this.transportConnectors.clear();
setTransportConnectors(al);
}
this.slave = false;
URI uri = getVmConnectorURI();
Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
map.put("network", "true");
@ -2821,4 +2824,9 @@ public class BrokerService implements Service {
public void setStartAsync(boolean startAsync) {
this.startAsync = startAsync;
}
public boolean isSlave() {
return this.slave;
}
}

View File

@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionMetaData;
@ -67,14 +68,17 @@ public class BrokerView implements BrokerViewMBean {
this.broker = broker;
}
@Override
public String getBrokerId() {
return safeGetBroker().getBrokerId().toString();
}
@Override
public String getBrokerName() {
return safeGetBroker().getBrokerName();
}
@Override
public String getBrokerVersion() {
return ActiveMQConnectionMetaData.PROVIDER_VERSION;
}
@ -84,6 +88,7 @@ public class BrokerView implements BrokerViewMBean {
return brokerService.getUptime();
}
@Override
public void gc() throws Exception {
brokerService.getBroker().gc();
try {
@ -93,35 +98,43 @@ public class BrokerView implements BrokerViewMBean {
}
}
@Override
public void start() throws Exception {
brokerService.start();
}
@Override
public void stop() throws Exception {
brokerService.stop();
}
@Override
public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
throws Exception {
brokerService.stopGracefully(connectorName, queueName, timeout, pollInterval);
}
@Override
public long getTotalEnqueueCount() {
return safeGetBroker().getDestinationStatistics().getEnqueues().getCount();
}
@Override
public long getTotalDequeueCount() {
return safeGetBroker().getDestinationStatistics().getDequeues().getCount();
}
@Override
public long getTotalConsumerCount() {
return safeGetBroker().getDestinationStatistics().getConsumers().getCount();
}
@Override
public long getTotalProducerCount() {
return safeGetBroker().getDestinationStatistics().getProducers().getCount();
}
@Override
public long getTotalMessageCount() {
return safeGetBroker().getDestinationStatistics().getMessages().getCount();
}
@ -130,138 +143,172 @@ public class BrokerView implements BrokerViewMBean {
return safeGetBroker().getDestinationStatistics().getMessagesCached().getCount();
}
@Override
public int getMemoryPercentUsage() {
return brokerService.getSystemUsage().getMemoryUsage().getPercentUsage();
}
@Override
public long getMemoryLimit() {
return brokerService.getSystemUsage().getMemoryUsage().getLimit();
}
@Override
public void setMemoryLimit(long limit) {
brokerService.getSystemUsage().getMemoryUsage().setLimit(limit);
}
@Override
public long getStoreLimit() {
return brokerService.getSystemUsage().getStoreUsage().getLimit();
}
@Override
public int getStorePercentUsage() {
return brokerService.getSystemUsage().getStoreUsage().getPercentUsage();
}
@Override
public long getTempLimit() {
return brokerService.getSystemUsage().getTempUsage().getLimit();
}
@Override
public int getTempPercentUsage() {
return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
}
@Override
public long getJobSchedulerStoreLimit() {
return brokerService.getSystemUsage().getJobSchedulerUsage().getLimit();
}
@Override
public int getJobSchedulerStorePercentUsage() {
return brokerService.getSystemUsage().getJobSchedulerUsage().getPercentUsage();
}
@Override
public void setStoreLimit(long limit) {
brokerService.getSystemUsage().getStoreUsage().setLimit(limit);
}
@Override
public void setTempLimit(long limit) {
brokerService.getSystemUsage().getTempUsage().setLimit(limit);
}
@Override
public void setJobSchedulerStoreLimit(long limit) {
brokerService.getSystemUsage().getJobSchedulerUsage().setLimit(limit);
}
@Override
public void resetStatistics() {
safeGetBroker().getDestinationStatistics().reset();
}
@Override
public void enableStatistics() {
safeGetBroker().getDestinationStatistics().setEnabled(true);
}
@Override
public void disableStatistics() {
safeGetBroker().getDestinationStatistics().setEnabled(false);
}
@Override
public boolean isStatisticsEnabled() {
return safeGetBroker().getDestinationStatistics().isEnabled();
}
@Override
public boolean isPersistent() {
return brokerService.isPersistent();
}
@Override
public void terminateJVM(int exitCode) {
System.exit(exitCode);
}
@Override
public ObjectName[] getTopics() {
return safeGetBroker().getTopics();
}
@Override
public ObjectName[] getQueues() {
return safeGetBroker().getQueues();
}
@Override
public ObjectName[] getTemporaryTopics() {
return safeGetBroker().getTemporaryTopics();
}
@Override
public ObjectName[] getTemporaryQueues() {
return safeGetBroker().getTemporaryQueues();
}
@Override
public ObjectName[] getTopicSubscribers() {
return safeGetBroker().getTopicSubscribers();
}
@Override
public ObjectName[] getDurableTopicSubscribers() {
return safeGetBroker().getDurableTopicSubscribers();
}
@Override
public ObjectName[] getQueueSubscribers() {
return safeGetBroker().getQueueSubscribers();
}
@Override
public ObjectName[] getTemporaryTopicSubscribers() {
return safeGetBroker().getTemporaryTopicSubscribers();
}
@Override
public ObjectName[] getTemporaryQueueSubscribers() {
return safeGetBroker().getTemporaryQueueSubscribers();
}
@Override
public ObjectName[] getInactiveDurableTopicSubscribers() {
return safeGetBroker().getInactiveDurableTopicSubscribers();
}
@Override
public ObjectName[] getTopicProducers() {
return safeGetBroker().getTopicProducers();
}
@Override
public ObjectName[] getQueueProducers() {
return safeGetBroker().getQueueProducers();
}
@Override
public ObjectName[] getTemporaryTopicProducers() {
return safeGetBroker().getTemporaryTopicProducers();
}
@Override
public ObjectName[] getTemporaryQueueProducers() {
return safeGetBroker().getTemporaryQueueProducers();
}
@Override
public ObjectName[] getDynamicDestinationProducers() {
return safeGetBroker().getDynamicDestinationProducers();
}
@Override
public String addConnector(String discoveryAddress) throws Exception {
TransportConnector connector = brokerService.addConnector(discoveryAddress);
if (connector == null) {
@ -271,6 +318,7 @@ public class BrokerView implements BrokerViewMBean {
return connector.getName();
}
@Override
public String addNetworkConnector(String discoveryAddress) throws Exception {
NetworkConnector connector = brokerService.addNetworkConnector(discoveryAddress);
if (connector == null) {
@ -280,6 +328,7 @@ public class BrokerView implements BrokerViewMBean {
return connector.getName();
}
@Override
public boolean removeConnector(String connectorName) throws Exception {
TransportConnector connector = brokerService.getConnectorByName(connectorName);
if (connector == null) {
@ -289,6 +338,7 @@ public class BrokerView implements BrokerViewMBean {
return brokerService.removeConnector(connector);
}
@Override
public boolean removeNetworkConnector(String connectorName) throws Exception {
NetworkConnector connector = brokerService.getNetworkConnectorByName(connectorName);
if (connector == null) {
@ -298,22 +348,27 @@ public class BrokerView implements BrokerViewMBean {
return brokerService.removeNetworkConnector(connector);
}
@Override
public void addTopic(String name) throws Exception {
safeGetBroker().getContextBroker().addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name),true);
}
@Override
public void addQueue(String name) throws Exception {
safeGetBroker().getContextBroker().addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name),true);
}
@Override
public void removeTopic(String name) throws Exception {
safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name), 1000);
}
@Override
public void removeQueue(String name) throws Exception {
safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name), 1000);
}
@Override
public ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName,
String selector) throws Exception {
ConnectionContext context = new ConnectionContext();
@ -336,6 +391,7 @@ public class BrokerView implements BrokerViewMBean {
return null;
}
@Override
public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception {
RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
info.setClientId(clientId);
@ -347,6 +403,7 @@ public class BrokerView implements BrokerViewMBean {
}
// doc comment inherited from BrokerViewMBean
@Override
public void reloadLog4jProperties() throws Throwable {
// Avoid a direct dependency on log4j.. use reflection.
@ -379,6 +436,7 @@ public class BrokerView implements BrokerViewMBean {
}
}
@Override
public Map<String, String> getTransportConnectors() {
Map<String, String> answer = new HashMap<String, String>();
try {
@ -396,6 +454,7 @@ public class BrokerView implements BrokerViewMBean {
return brokerService.getTransportConnectorURIsAsMap().get(type);
}
@Override
@Deprecated
/**
* @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
@ -405,6 +464,7 @@ public class BrokerView implements BrokerViewMBean {
return answer != null ? answer : "";
}
@Override
@Deprecated
/**
* @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
@ -414,6 +474,7 @@ public class BrokerView implements BrokerViewMBean {
return answer != null ? answer : "";
}
@Override
@Deprecated
/**
* @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
@ -423,6 +484,7 @@ public class BrokerView implements BrokerViewMBean {
return answer != null ? answer : "";
}
@Override
@Deprecated
/**
* @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
@ -432,11 +494,13 @@ public class BrokerView implements BrokerViewMBean {
return answer != null ? answer : "";
}
@Override
public String getVMURL() {
URI answer = brokerService.getVmConnectorURI();
return answer != null ? answer.toString() : "";
}
@Override
public String getDataDirectory() {
File file = brokerService.getDataDirectoryFile();
try {
@ -446,6 +510,7 @@ public class BrokerView implements BrokerViewMBean {
}
}
@Override
public ObjectName getJMSJobScheduler() {
return this.jmsJobScheduler;
}
@ -454,6 +519,11 @@ public class BrokerView implements BrokerViewMBean {
this.jmsJobScheduler=name;
}
@Override
public boolean isSlave() {
return brokerService.isSlave();
}
private ManagedRegionBroker safeGetBroker() {
if (broker == null) {
throw new IllegalStateException("Broker is not yet started.");

View File

@ -123,6 +123,9 @@ public interface BrokerViewMBean extends Service {
@MBeanInfo("Messages are synchronized to disk.")
boolean isPersistent();
@MBeanInfo("Slave broker.")
boolean isSlave();
/**
* Shuts down the JVM.
*

View File

@ -49,6 +49,7 @@ abstract public class QueueMasterSlaveTestSupport extends JmsTopicSendReceiveWit
protected int failureCount = 50;
protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false&useExponentialBackOff=false";
@Override
protected void setUp() throws Exception {
setMaxTestTime(TimeUnit.MINUTES.toMillis(10));
setAutoFail(true);
@ -74,6 +75,7 @@ abstract public class QueueMasterSlaveTestSupport extends JmsTopicSendReceiveWit
return "org/apache/activemq/broker/ft/master.xml";
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
master.stop();
@ -86,10 +88,12 @@ abstract public class QueueMasterSlaveTestSupport extends JmsTopicSendReceiveWit
master.stop();
}
@Override
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(uriString);
}
@Override
protected void messageSent() throws Exception {
if (++inflightMessageCount == failureCount) {
Thread.sleep(1000);
@ -123,13 +127,16 @@ abstract public class QueueMasterSlaveTestSupport extends JmsTopicSendReceiveWit
MessageConsumer qConsumer = session.createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.TA1"));
assertNull("No message there yet", qConsumer.receive(1000));
qConsumer.close();
assertTrue(!master.isSlave());
master.stop();
assertTrue("slave started", slaveStarted.await(15, TimeUnit.SECONDS));
assertTrue(!slave.get().isSlave());
final String text = "ForUWhenSlaveKicksIn";
producer.send(new ActiveMQTopic("VirtualTopic.TA1"), session.createTextMessage(text));
qConsumer = session.createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.TA1"));
javax.jms.Message message = qConsumer.receive(4000);
assertNotNull("Get message after failover", message);
assertEquals("correct message", text, ((TextMessage)message).getText());

View File

@ -336,6 +336,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
assertTrue("broker is not a slave", !broker.isSlave());
// create 2 topics
broker.addTopic(getDestinationString() + "1 ");
broker.addTopic(" " + getDestinationString() + "2");
@ -534,6 +535,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
assertTrue("broker is not a slave", !broker.isSlave());
// create 2 topics
broker.addTopic(getDestinationString() + "1");
broker.addTopic(getDestinationString() + "2");
@ -585,6 +587,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
assertTrue("broker is not a slave", !broker.isSlave());
// create 2 topics
broker.addTopic(getDestinationString() + "1");
broker.addTopic(getDestinationString() + "2");