Add connection counts to Broker mbean
This commit is contained in:
Timothy Bish 2014-01-28 14:07:39 -05:00
parent 713250f5f0
commit fde22a8496
5 changed files with 170 additions and 47 deletions

View File

@ -45,6 +45,8 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@ -56,7 +58,21 @@ import org.apache.activemq.ConfigurationException;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
import org.apache.activemq.broker.jmx.*;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.ConnectorView;
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.HealthView;
import org.apache.activemq.broker.jmx.HealthViewMBean;
import org.apache.activemq.broker.jmx.JmsConnectorView;
import org.apache.activemq.broker.jmx.JobSchedulerView;
import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.NetworkConnectorView;
import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
import org.apache.activemq.broker.jmx.ProxyConnectorView;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory;
@ -94,7 +110,16 @@ import org.apache.activemq.transport.TransportFactorySupport;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.*;
import org.apache.activemq.util.BrokerSupport;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.apache.activemq.util.IOExceptionHandler;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.util.TimeUtils;
import org.apache.activemq.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@ -206,6 +231,8 @@ public class BrokerService implements Service {
private boolean networkConnectorStartAsync = false;
private boolean allowTempAutoCreationOnSend;
private JobSchedulerStore jobSchedulerStore;
private final AtomicLong totalConnections = new AtomicLong();
private final AtomicInteger currentConnections = new AtomicInteger();
private long offlineDurableSubscriberTimeout = -1;
private long offlineDurableSubscriberTaskSchedule = 300000;
@ -2941,4 +2968,30 @@ public class BrokerService implements Service {
public void setStoreOpenWireVersion(int storeOpenWireVersion) {
this.storeOpenWireVersion = storeOpenWireVersion;
}
/**
* @return the current number of connections on this Broker.
*/
public int getCurrentConnections() {
return this.currentConnections.get();
}
/**
* @return the total number of connections this broker has handled since startup.
*/
public long getTotalConnections() {
return this.totalConnections.get();
}
public void incrementCurrentConnections() {
this.currentConnections.incrementAndGet();
}
public void decrementCurrentConnections() {
this.currentConnections.decrementAndGet();
}
public void incrementTotalConnections() {
this.totalConnections.incrementAndGet();
}
}

View File

@ -88,6 +88,16 @@ public class BrokerView implements BrokerViewMBean {
return brokerService.getUptime();
}
@Override
public int getCurrentConnectionsCount() {
return brokerService.getCurrentConnections();
}
@Override
public long getTotalConnectionsCount() {
return brokerService.getTotalConnections();
}
@Override
public void gc() throws Exception {
brokerService.getBroker().gc();
@ -148,6 +158,7 @@ public class BrokerView implements BrokerViewMBean {
/**
* @return the average size of a message (bytes)
*/
@Override
public double getAverageMessageSize() {
return safeGetBroker().getDestinationStatistics().getMessageSize().getAverageSize();
}
@ -155,6 +166,7 @@ public class BrokerView implements BrokerViewMBean {
/**
* @return the max size of a message (bytes)
*/
@Override
public long getMaxMessageSize() {
return safeGetBroker().getDestinationStatistics().getMessageSize().getMaxSize();
}
@ -162,11 +174,11 @@ public class BrokerView implements BrokerViewMBean {
/**
* @return the min size of a message (bytes)
*/
@Override
public long getMinMessageSize() {
return safeGetBroker().getDestinationStatistics().getMessageSize().getMinSize();
}
public long getTotalMessagesCached() {
return safeGetBroker().getDestinationStatistics().getMessagesCached().getCount();
}
@ -431,7 +443,6 @@ public class BrokerView implements BrokerViewMBean {
brokerService.getBroker().removeSubscription(context, info);
}
// doc comment inherited from BrokerViewMBean
@Override
public void reloadLog4jProperties() throws Throwable {

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker.jmx;
import java.util.Map;
import javax.management.ObjectName;
import org.apache.activemq.Service;
@ -52,6 +53,16 @@ public interface BrokerViewMBean extends Service {
@MBeanInfo("Uptime of the broker.")
String getUptime();
/**
* @return The current number of active connections on this Broker.
*/
int getCurrentConnectionsCount();
/**
* @return The total number of connections serviced since this Broker was started.
*/
long getTotalConnectionsCount();
/**
* The Broker will flush it's caches so that the garbage collector can
* reclaim more memory.
@ -88,7 +99,6 @@ public interface BrokerViewMBean extends Service {
@MBeanInfo("Number of unacknowledged messages on the broker.")
long getTotalMessageCount();
@MBeanInfo("Average message size on this broker")
double getAverageMessageSize();
@ -121,7 +131,7 @@ public interface BrokerViewMBean extends Service {
long getTempLimit();
void setTempLimit(@MBeanInfo("bytes") long limit);
@MBeanInfo("Percent of job store limit used.")
int getJobSchedulerStorePercentUsage();
@ -148,6 +158,7 @@ public interface BrokerViewMBean extends Service {
/**
* Stop the broker and all it's components.
*/
@Override
@MBeanInfo("Stop the broker and all its components.")
void stop() throws Exception;
@ -324,5 +335,4 @@ public interface BrokerViewMBean extends Service {
@MBeanInfo("JMSJobScheduler")
ObjectName getJMSJobScheduler();
}

View File

@ -61,6 +61,7 @@ import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
@ -225,6 +226,19 @@ public class ManagedRegionBroker extends RegionBroker {
}
}
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
super.addConnection(context, info);
this.contextBroker.getBrokerService().incrementCurrentConnections();
this.contextBroker.getBrokerService().incrementTotalConnections();
}
@Override
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
super.removeConnection(context, info, error);
this.contextBroker.getBrokerService().decrementCurrentConnections();
}
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
Subscription sub = super.addConsumer(context, info);
@ -440,7 +454,6 @@ public class ManagedRegionBroker extends RegionBroker {
LOG.warn("Failed to register MBean {}", key);
LOG.debug("Failure reason: ", e);
}
}
protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {

View File

@ -24,11 +24,26 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.jms.*;
import javax.management.*;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import junit.textui.TestRunner;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
@ -78,7 +93,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
public void testConnectors() throws Exception{
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
assertEquals("openwire URL port doesn't equal bind Address",
new URI(broker.getTransportConnectorByType("tcp")).getPort(),
@ -105,7 +120,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
CompositeData[] compdatalist = queue.browse();
int initialQueueSize = compdatalist.length;
@ -143,7 +158,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
echo("Now browsing the second queue");
queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination );
QueueViewMBean queueNew = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
QueueViewMBean queueNew = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
long newQueuesize = queueNew.getQueueSize();
echo("Second queue size: " + newQueuesize);
@ -223,7 +238,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
useConnection(connection);
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
long initialQueueSize = queue.getQueueSize();
echo("current queue size: " + initialQueueSize);
@ -245,7 +260,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
Thread.sleep(1000);
ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME );
QueueViewMBean dlq = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true);
QueueViewMBean dlq = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true);
long initialDlqSize = dlq.getQueueSize();
CompositeData[] compdatalist = dlq.browse();
@ -298,14 +313,14 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString() );
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
String newDestination = getSecondDestinationString();
queue.moveMatchingMessagesTo("counter > 2", newDestination);
queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination);
queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
int movedSize = MESSAGE_COUNT-3;
assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
@ -382,19 +397,18 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertEquals("topic3 Durable subscriber count", 1, topic3.getConsumerCount());
}
@SuppressWarnings("rawtypes")
protected void assertSendViaMBean() throws Exception {
String queueName = getDestinationString() + ".SendMBBean";
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
echo("Create QueueView MBean...");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
broker.addQueue(queueName);
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queueName);
echo("Create QueueView MBean...");
QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
QueueViewMBean proxy = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
proxy.purge();
@ -422,6 +436,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
browseAndVerifyTypes(proxy, false);
}
@SuppressWarnings("rawtypes")
private void browseAndVerifyTypes(QueueViewMBean proxy, boolean allStrings) throws Exception {
CompositeData[] compdatalist = proxy.browse();
if (compdatalist.length == 0) {
@ -479,13 +494,13 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
echo("Create QueueView MBean...");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
broker.addQueue(queueName);
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queueName);
echo("Create QueueView MBean...");
QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
QueueViewMBean proxy = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
proxy.purge();
@ -520,7 +535,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
echo("Create QueueView MBean...");
QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
QueueViewMBean proxy = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
long concount = proxy.getConsumerCount();
echo("Consumer Count :" + concount);
@ -570,7 +585,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
// lets create a new topic
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
echo("Create QueueView MBean...");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
broker.addTopic(getDestinationString());
@ -593,7 +608,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
protected void assertConsumerCounts() throws Exception {
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
assertTrue("broker is not a slave", !broker.isSlave());
// create 2 topics
@ -602,8 +617,8 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName topicObjName1 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + getDestinationString() + "1");
ObjectName topicObjName2 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + getDestinationString() + "2");
TopicViewMBean topic1 = (TopicViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName1, TopicViewMBean.class, true);
TopicViewMBean topic2 = (TopicViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName2, TopicViewMBean.class, true);
TopicViewMBean topic1 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName1, TopicViewMBean.class, true);
TopicViewMBean topic2 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName2, TopicViewMBean.class, true);
assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount());
assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());
@ -645,7 +660,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
protected void assertProducerCounts() throws Exception {
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
assertTrue("broker is not a slave", !broker.isSlave());
// create 2 topics
@ -654,8 +669,8 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName topicObjName1 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + getDestinationString() + "1");
ObjectName topicObjName2 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + getDestinationString() + "2");
TopicViewMBean topic1 = (TopicViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName1, TopicViewMBean.class, true);
TopicViewMBean topic2 = (TopicViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName2, TopicViewMBean.class, true);
TopicViewMBean topic1 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName1, TopicViewMBean.class, true);
TopicViewMBean topic2 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName2, TopicViewMBean.class, true);
assertEquals("topic1 Producer count", 0, topic1.getProducerCount());
assertEquals("topic2 Producer count", 0, topic2.getProducerCount());
@ -737,6 +752,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
return objectName;
}
@Override
protected void setUp() throws Exception {
bindAddress = "tcp://localhost:0";
useTopic = false;
@ -745,6 +761,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
mbeanServer = managementContext.getMBeanServer();
}
@Override
protected void tearDown() throws Exception {
if (waitForKeyPress) {
// We are running from the command line so let folks browse the
@ -768,6 +785,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
}
@Override
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
answer.setPersistent(false);
@ -855,7 +873,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
connection = connectionFactory.createConnection();
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
assertEquals(0, broker.getDynamicDestinationProducers().length);
@ -873,7 +891,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName viewName = broker.getDynamicDestinationProducers()[0];
assertNotNull(viewName);
ProducerViewMBean view = (ProducerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, viewName, ProducerViewMBean.class, true);
ProducerViewMBean view = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, viewName, ProducerViewMBean.class, true);
assertNotNull(view);
assertEquals("NOTSET", view.getDestinationName());
@ -940,7 +958,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
CompositeData[] compdatalist = queue.browse();
int initialQueueSize = compdatalist.length;
@ -966,7 +984,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
public void testDestinationOptionsAreVisible() throws Exception {
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + QUEUE_WITH_OPTIONS );
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
assertEquals("name match", QUEUE_WITH_OPTIONS, queue.getName());
@ -992,7 +1010,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
MessageProducer producer = session.createProducer(queue);
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
Thread.sleep(100);
@ -1053,7 +1071,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
echo("Create QueueView MBean...");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
assertEquals("Durable subscriber count", 0, broker.getDurableTopicSubscribers().length);
assertEquals("Durable subscriber count", 0, broker.getInactiveDurableTopicSubscribers().length);
@ -1108,7 +1126,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
MessageConsumer durable = session.createDurableSubscriber(topic, "Durable");
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
Thread.sleep(100);
@ -1117,7 +1135,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertTrue(broker.getQueueSubscribers().length == 1);
ObjectName producerName = broker.getQueueProducers()[0];
ProducerViewMBean producerView = (ProducerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, producerName, ProducerViewMBean.class, true);
ProducerViewMBean producerView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, producerName, ProducerViewMBean.class, true);
assertNotNull(producerView);
if (expect) {
@ -1127,7 +1145,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
}
for (ObjectName name : broker.getTopicSubscribers()) {
SubscriptionViewMBean subscriberView = (SubscriptionViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, SubscriptionViewMBean.class, true);
SubscriptionViewMBean subscriberView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, SubscriptionViewMBean.class, true);
if (expect) {
assertEquals("admin", subscriberView.getUserName());
} else {
@ -1136,7 +1154,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
}
for (ObjectName name : broker.getQueueSubscribers()) {
SubscriptionViewMBean subscriberView = (SubscriptionViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, SubscriptionViewMBean.class, true);
SubscriptionViewMBean subscriberView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, SubscriptionViewMBean.class, true);
if (expect) {
assertEquals("admin", subscriberView.getUserName());
} else {
@ -1152,7 +1170,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
if (name.toString().endsWith("connectionName=MBeanTest")) {
ConnectionViewMBean connectionView =
(ConnectionViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, ConnectionViewMBean.class, true);
MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, ConnectionViewMBean.class, true);
assertNotNull(connectionView);
if (expect) {
@ -1175,14 +1193,14 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
String newDestination = getSecondDestinationString();
queue.moveMatchingMessagesTo("", newDestination);
queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination);
queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
int movedSize = MESSAGE_COUNT;
assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
@ -1208,20 +1226,36 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
}
public void testConnectionCounts() throws Exception {
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
assertEquals(0, broker.getCurrentConnectionsCount());
connection = connectionFactory.createConnection();
useConnection(connection);
assertEquals(1, broker.getCurrentConnectionsCount());
connection.close();
assertEquals(0, broker.getCurrentConnectionsCount());
assertEquals(1, broker.getTotalConnectionsCount());
}
public void testCopyMessagesToRetainOrder() throws Exception {
connection = connectionFactory.createConnection();
useConnection(connection);
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
String newDestination = getSecondDestinationString();
queue.copyMatchingMessagesTo("", newDestination);
queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination );
queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
int movedSize = MESSAGE_COUNT;
assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
@ -1253,7 +1287,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
String queueName = getDestinationString();
queue.removeMatchingMessages("counter < 10");
@ -1289,7 +1323,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
CompositeData[] compdatalist = queue.browse();
int initialQueueSize = compdatalist.length;
@ -1335,6 +1369,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName query = new ObjectName(domain + ":type=Broker,brokerName=localhost,endpoint=dynamicProducer,*");
Set<ObjectInstance> mbeans = mbeanServer.queryMBeans(query, null);
assertEquals(mbeans.size(), 1);
producer.close();
}
public void testDurableSubQuery() throws Exception {
@ -1346,5 +1381,6 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName query = new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test.topic,endpoint=Consumer,consumerId=Durable(*),*");
Set<ObjectInstance> mbeans = mbeanServer.queryMBeans(query, null);
assertEquals(mbeans.size(), 1);
sub.close();
}
}