git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1234010 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-01-20 17:12:54 +00:00
parent e3a1fc9b35
commit 999dc0df20
2 changed files with 406 additions and 42 deletions

View File

@ -23,6 +23,7 @@ import java.lang.reflect.Method;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URI; import java.net.URI;
import java.net.URL; import java.net.URL;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName; import javax.management.ObjectName;
@ -65,11 +66,11 @@ public class BrokerView implements BrokerViewMBean {
} }
public String getBrokerId() { public String getBrokerId() {
return broker.getBrokerId().toString(); return safeGetBroker().getBrokerId().toString();
} }
public String getBrokerName() { public String getBrokerName() {
return broker.getBrokerName(); return safeGetBroker().getBrokerName();
} }
public String getBrokerVersion() { public String getBrokerVersion() {
@ -98,29 +99,28 @@ public class BrokerView implements BrokerViewMBean {
brokerService.stopGracefully(connectorName, queueName, timeout, pollInterval); brokerService.stopGracefully(connectorName, queueName, timeout, pollInterval);
} }
public long getTotalEnqueueCount() { public long getTotalEnqueueCount() {
return broker.getDestinationStatistics().getEnqueues().getCount(); return safeGetBroker().getDestinationStatistics().getEnqueues().getCount();
} }
public long getTotalDequeueCount() { public long getTotalDequeueCount() {
return broker.getDestinationStatistics().getDequeues().getCount(); return safeGetBroker().getDestinationStatistics().getDequeues().getCount();
} }
public long getTotalConsumerCount() { public long getTotalConsumerCount() {
return broker.getDestinationStatistics().getConsumers().getCount(); return safeGetBroker().getDestinationStatistics().getConsumers().getCount();
} }
public long getTotalProducerCount() { public long getTotalProducerCount() {
return broker.getDestinationStatistics().getProducers().getCount(); return safeGetBroker().getDestinationStatistics().getProducers().getCount();
} }
public long getTotalMessageCount() { public long getTotalMessageCount() {
return broker.getDestinationStatistics().getMessages().getCount(); return safeGetBroker().getDestinationStatistics().getMessages().getCount();
} }
public long getTotalMessagesCached() { public long getTotalMessagesCached() {
return broker.getDestinationStatistics().getMessagesCached().getCount(); return safeGetBroker().getDestinationStatistics().getMessagesCached().getCount();
} }
public int getMemoryPercentUsage() { public int getMemoryPercentUsage() {
@ -143,7 +143,6 @@ public class BrokerView implements BrokerViewMBean {
return brokerService.getSystemUsage().getStoreUsage().getPercentUsage(); return brokerService.getSystemUsage().getStoreUsage().getPercentUsage();
} }
public long getTempLimit() { public long getTempLimit() {
return brokerService.getSystemUsage().getTempUsage().getLimit(); return brokerService.getSystemUsage().getTempUsage().getLimit();
} }
@ -160,21 +159,20 @@ public class BrokerView implements BrokerViewMBean {
brokerService.getSystemUsage().getTempUsage().setLimit(limit); brokerService.getSystemUsage().getTempUsage().setLimit(limit);
} }
public void resetStatistics() { public void resetStatistics() {
broker.getDestinationStatistics().reset(); safeGetBroker().getDestinationStatistics().reset();
} }
public void enableStatistics() { public void enableStatistics() {
broker.getDestinationStatistics().setEnabled(true); safeGetBroker().getDestinationStatistics().setEnabled(true);
} }
public void disableStatistics() { public void disableStatistics() {
broker.getDestinationStatistics().setEnabled(false); safeGetBroker().getDestinationStatistics().setEnabled(false);
} }
public boolean isStatisticsEnabled() { public boolean isStatisticsEnabled() {
return broker.getDestinationStatistics().isEnabled(); return safeGetBroker().getDestinationStatistics().isEnabled();
} }
public boolean isPersistent() { public boolean isPersistent() {
@ -190,111 +188,121 @@ public class BrokerView implements BrokerViewMBean {
} }
public ObjectName[] getTopics() { public ObjectName[] getTopics() {
return broker.getTopics(); return safeGetBroker().getTopics();
} }
public ObjectName[] getQueues() { public ObjectName[] getQueues() {
return broker.getQueues(); return safeGetBroker().getQueues();
} }
public ObjectName[] getTemporaryTopics() { public ObjectName[] getTemporaryTopics() {
return broker.getTemporaryTopics(); return safeGetBroker().getTemporaryTopics();
} }
public ObjectName[] getTemporaryQueues() { public ObjectName[] getTemporaryQueues() {
return broker.getTemporaryQueues(); return safeGetBroker().getTemporaryQueues();
} }
public ObjectName[] getTopicSubscribers() { public ObjectName[] getTopicSubscribers() {
return broker.getTopicSubscribers(); return safeGetBroker().getTopicSubscribers();
} }
public ObjectName[] getDurableTopicSubscribers() { public ObjectName[] getDurableTopicSubscribers() {
return broker.getDurableTopicSubscribers(); return safeGetBroker().getDurableTopicSubscribers();
} }
public ObjectName[] getQueueSubscribers() { public ObjectName[] getQueueSubscribers() {
return broker.getQueueSubscribers(); return safeGetBroker().getQueueSubscribers();
} }
public ObjectName[] getTemporaryTopicSubscribers() { public ObjectName[] getTemporaryTopicSubscribers() {
return broker.getTemporaryTopicSubscribers(); return safeGetBroker().getTemporaryTopicSubscribers();
} }
public ObjectName[] getTemporaryQueueSubscribers() { public ObjectName[] getTemporaryQueueSubscribers() {
return broker.getTemporaryQueueSubscribers(); return safeGetBroker().getTemporaryQueueSubscribers();
} }
public ObjectName[] getInactiveDurableTopicSubscribers() { public ObjectName[] getInactiveDurableTopicSubscribers() {
return broker.getInactiveDurableTopicSubscribers(); return safeGetBroker().getInactiveDurableTopicSubscribers();
} }
public ObjectName[] getTopicProducers() { public ObjectName[] getTopicProducers() {
return broker.getTopicProducers(); return safeGetBroker().getTopicProducers();
} }
public ObjectName[] getQueueProducers() { public ObjectName[] getQueueProducers() {
return broker.getQueueProducers(); return safeGetBroker().getQueueProducers();
} }
public ObjectName[] getTemporaryTopicProducers() { public ObjectName[] getTemporaryTopicProducers() {
return broker.getTemporaryTopicProducers(); return safeGetBroker().getTemporaryTopicProducers();
} }
public ObjectName[] getTemporaryQueueProducers() { public ObjectName[] getTemporaryQueueProducers() {
return broker.getTemporaryQueueProducers(); return safeGetBroker().getTemporaryQueueProducers();
} }
public ObjectName[] getDynamicDestinationProducers() { public ObjectName[] getDynamicDestinationProducers() {
return broker.getDynamicDestinationProducers(); return safeGetBroker().getDynamicDestinationProducers();
} }
public String addConnector(String discoveryAddress) throws Exception { public String addConnector(String discoveryAddress) throws Exception {
TransportConnector connector = brokerService.addConnector(discoveryAddress); TransportConnector connector = brokerService.addConnector(discoveryAddress);
if (connector == null) {
throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress);
}
connector.start(); connector.start();
return connector.getName(); return connector.getName();
} }
public String addNetworkConnector(String discoveryAddress) throws Exception { public String addNetworkConnector(String discoveryAddress) throws Exception {
NetworkConnector connector = brokerService.addNetworkConnector(discoveryAddress); NetworkConnector connector = brokerService.addNetworkConnector(discoveryAddress);
if (connector == null) {
throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress);
}
connector.start(); connector.start();
return connector.getName(); return connector.getName();
} }
public boolean removeConnector(String connectorName) throws Exception { public boolean removeConnector(String connectorName) throws Exception {
TransportConnector connector = brokerService.getConnectorByName(connectorName); TransportConnector connector = brokerService.getConnectorByName(connectorName);
if (connector == null) {
throw new NoSuchElementException("Not connector matched the given name: " + connectorName);
}
connector.stop(); connector.stop();
return brokerService.removeConnector(connector); return brokerService.removeConnector(connector);
} }
public boolean removeNetworkConnector(String connectorName) throws Exception { public boolean removeNetworkConnector(String connectorName) throws Exception {
NetworkConnector connector = brokerService.getNetworkConnectorByName(connectorName); NetworkConnector connector = brokerService.getNetworkConnectorByName(connectorName);
if (connector == null) {
throw new NoSuchElementException("Not connector matched the given name: " + connectorName);
}
connector.stop(); connector.stop();
return brokerService.removeNetworkConnector(connector); return brokerService.removeNetworkConnector(connector);
} }
public void addTopic(String name) throws Exception { public void addTopic(String name) throws Exception {
broker.getContextBroker().addDestination(BrokerSupport.getConnectionContext(broker.getContextBroker()), new ActiveMQTopic(name),true); safeGetBroker().getContextBroker().addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name),true);
} }
public void addQueue(String name) throws Exception { public void addQueue(String name) throws Exception {
broker.getContextBroker().addDestination(BrokerSupport.getConnectionContext(broker.getContextBroker()), new ActiveMQQueue(name),true); safeGetBroker().getContextBroker().addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name),true);
} }
public void removeTopic(String name) throws Exception { public void removeTopic(String name) throws Exception {
broker.getContextBroker().removeDestination(BrokerSupport.getConnectionContext(broker.getContextBroker()), new ActiveMQTopic(name), safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name), 1000);
1000);
} }
public void removeQueue(String name) throws Exception { public void removeQueue(String name) throws Exception {
broker.getContextBroker().removeDestination(BrokerSupport.getConnectionContext(broker.getContextBroker()), new ActiveMQQueue(name), safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name), 1000);
1000);
} }
public ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName, public ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName,
String selector) throws Exception { String selector) throws Exception {
ConnectionContext context = new ConnectionContext(); ConnectionContext context = new ConnectionContext();
context.setBroker(broker); context.setBroker(safeGetBroker());
context.setClientId(clientId); context.setClientId(clientId);
ConsumerInfo info = new ConsumerInfo(); ConsumerInfo info = new ConsumerInfo();
ConsumerId consumerId = new ConsumerId(); ConsumerId consumerId = new ConsumerId();
@ -305,8 +313,8 @@ public class BrokerView implements BrokerViewMBean {
info.setDestination(new ActiveMQTopic(topicName)); info.setDestination(new ActiveMQTopic(topicName));
info.setSubscriptionName(subscriberName); info.setSubscriptionName(subscriberName);
info.setSelector(selector); info.setSelector(selector);
Subscription subscription = broker.addConsumer(context, info); Subscription subscription = safeGetBroker().addConsumer(context, info);
broker.removeConsumer(context, info); safeGetBroker().removeConsumer(context, info);
if (subscription != null) { if (subscription != null) {
return subscription.getObjectName(); return subscription.getObjectName();
} }
@ -318,9 +326,9 @@ public class BrokerView implements BrokerViewMBean {
info.setClientId(clientId); info.setClientId(clientId);
info.setSubscriptionName(subscriberName); info.setSubscriptionName(subscriberName);
ConnectionContext context = new ConnectionContext(); ConnectionContext context = new ConnectionContext();
context.setBroker(broker); context.setBroker(safeGetBroker());
context.setClientId(clientId); context.setClientId(clientId);
broker.removeSubscription(context, info); safeGetBroker().removeSubscription(context, info);
} }
// doc comment inherited from BrokerViewMBean // doc comment inherited from BrokerViewMBean
@ -356,7 +364,6 @@ public class BrokerView implements BrokerViewMBean {
} }
} }
public String getOpenWireURL() { public String getOpenWireURL() {
String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp"); String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp");
return answer != null ? answer : ""; return answer != null ? answer : "";
@ -398,4 +405,12 @@ public class BrokerView implements BrokerViewMBean {
public void setJMSJobScheduler(ObjectName name) { public void setJMSJobScheduler(ObjectName name) {
this.jmsJobScheduler=name; this.jmsJobScheduler=name;
} }
private ManagedRegionBroker safeGetBroker() {
if (broker == null) {
throw new IllegalStateException("Broker is not yet started.");
}
return broker;
}
} }

View File

@ -0,0 +1,349 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import static org.junit.Assert.*;
import java.io.File;
import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Used to verify that the BrokerView accessed while the BrokerSerivce is waiting
* for a Slow Store startup to complete doesn't throw unexpected NullPointerExceptions.
*/
public class BrokerViewSlowStoreStartupTest {
private static final Logger LOG = LoggerFactory.getLogger(BrokerViewSlowStoreStartupTest.class);
private final CountDownLatch holdStoreStart = new CountDownLatch(1);
private final String brokerName = "brokerViewTest";
private BrokerService broker;
private Thread startThread;
private BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName(brokerName);
KahaDBStore kaha = new KahaDBStore() {
@Override
public void start() throws Exception {
LOG.info("Test KahaDB class is waiting for signal to complete its start()");
holdStoreStart.await();
super.start();
LOG.info("Test KahaDB class is completed its start()");
}
};
kaha.setDirectory(new File("target/activemq-data/kahadb"));
kaha.deleteAllMessages();
broker.setPersistenceAdapter(kaha);
broker.setUseJmx(true);
return broker;
}
@Before
public void setUp() throws Exception {
broker = createBroker();
startThread = new Thread(new Runnable() {
@Override
public void run() {
try {
broker.start();
} catch(Exception e) {
}
}
});
startThread.start();
}
@After
public void tearDown() throws Exception {
// ensure we don't keep the broker held if an exception occurs somewhere.
holdStoreStart.countDown();
startThread.join();
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
}
@Test(timeout=120000)
public void testBrokerViewOnSlowStoreStart() throws Exception {
// Ensure we have an Admin View.
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return (broker.getAdminView()) != null;
}
}));
BrokerView view = broker.getAdminView();
try {
view.getBrokerName();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getBrokerId();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getTotalEnqueueCount();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getTotalDequeueCount();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getTotalConsumerCount();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getTotalProducerCount();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getTotalMessageCount();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getTotalMessagesCached();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.resetStatistics();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.enableStatistics();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.disableStatistics();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.isStatisticsEnabled();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getTopics();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getQueues();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getTemporaryTopics();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getTemporaryQueues();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getTopicSubscribers();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getDurableTopicSubscribers();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getQueueSubscribers();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getTemporaryTopicSubscribers();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getTemporaryQueueSubscribers();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getInactiveDurableTopicSubscribers();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getTopicProducers();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getQueueProducers();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getTemporaryTopicProducers();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getTemporaryQueueProducers();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.getDynamicDestinationProducers();
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.removeConnector("tcp");
fail("Should have thrown an NoSuchElementException");
} catch(NoSuchElementException e) {
}
try {
view.removeNetworkConnector("tcp");
fail("Should have thrown an NoSuchElementException");
} catch(NoSuchElementException e) {
}
try {
view.addTopic("TEST");
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.addQueue("TEST");
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.removeTopic("TEST");
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.removeQueue("TEST");
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.createDurableSubscriber("1", "2", "3","4");
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
try {
view.destroyDurableSubscriber("1", "2");
fail("Should have thrown an IllegalStateException");
} catch(IllegalStateException e) {
}
holdStoreStart.countDown();
startThread.join();
assertNotNull(view.getBroker());
try {
view.getBrokerName();
} catch(Exception e) {
fail("caught an exception getting the Broker property: " + e.getClass().getName());
}
try {
view.getBrokerId();
} catch(IllegalStateException e) {
fail("caught an exception getting the Broker property: " + e.getClass().getName());
}
try {
view.getTotalEnqueueCount();
} catch(IllegalStateException e) {
fail("caught an exception getting the Broker property: " + e.getClass().getName());
}
}
}