git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1296720 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-03-03 23:30:09 +00:00
parent 686bfcf496
commit 0efaaecfd1
4 changed files with 129 additions and 3 deletions

View File

@ -16,21 +16,36 @@
*/
package org.apache.activemq.broker.jmx;
import java.io.IOException;
import java.util.Set;
import javax.management.ObjectName;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.JMXSupport;
public class ConnectionView implements ConnectionViewMBean {
private final Connection connection;
private final ManagementContext managementContext;
private String userName;
public ConnectionView(Connection connection) {
this.connection = connection;
this(connection, null);
}
public ConnectionView(Connection connection, ManagementContext managementContext) {
this.connection = connection;
this.managementContext = managementContext;
}
@Override
public void start() throws Exception {
connection.start();
}
@Override
public void stop() throws Exception {
connection.stop();
}
@ -38,6 +53,7 @@ public class ConnectionView implements ConnectionViewMBean {
/**
* @return true if the Connection is slow
*/
@Override
public boolean isSlow() {
return connection.isSlow();
}
@ -45,6 +61,7 @@ public class ConnectionView implements ConnectionViewMBean {
/**
* @return if after being marked, the Connection is still writing
*/
@Override
public boolean isBlocked() {
return connection.isBlocked();
}
@ -52,6 +69,7 @@ public class ConnectionView implements ConnectionViewMBean {
/**
* @return true if the Connection is connected
*/
@Override
public boolean isConnected() {
return connection.isConnected();
}
@ -59,10 +77,12 @@ public class ConnectionView implements ConnectionViewMBean {
/**
* @return true if the Connection is active
*/
@Override
public boolean isActive() {
return connection.isActive();
}
@Override
public int getDispatchQueueSize() {
return connection.getDispatchQueueSize();
}
@ -70,10 +90,12 @@ public class ConnectionView implements ConnectionViewMBean {
/**
* Resets the statistics
*/
@Override
public void resetStatistics() {
connection.getStatistics().reset();
}
@Override
public String getRemoteAddress() {
return connection.getRemoteAddress();
}
@ -90,4 +112,62 @@ public class ConnectionView implements ConnectionViewMBean {
public void setUserName(String userName) {
this.userName = userName;
}
@Override
public ObjectName[] getConsumers() {
ObjectName[] result = null;
if (connection != null && managementContext != null) {
try {
ObjectName query = createConsumerQueury(connection.getConnectionId());
Set<ObjectName> names = managementContext.queryNames(query, null);
result = names.toArray(new ObjectName[0]);
} catch (Exception e) {
}
}
return result;
}
@Override
public ObjectName[] getProducers() {
ObjectName[] result = null;
if (connection != null && managementContext != null) {
try {
ObjectName query = createProducerQueury(connection.getConnectionId());
Set<ObjectName> names = managementContext.queryNames(query, null);
result = names.toArray(new ObjectName[0]);
} catch (Exception e) {
}
}
return result;
}
private ObjectName createConsumerQueury(String clientId) throws IOException {
try {
return new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=*,"
+ "Type=Subscription,persistentMode=*,"
+ "destinationType=*,destinationName=*,"
+ "clientId=" + JMXSupport.encodeObjectNamePart(clientId) + ","
+ "consumerId=*");
} catch (Throwable e) {
throw IOExceptionSupport.create(e);
}
}
private ObjectName createProducerQueury(String clientId) throws IOException {
try {
return new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=*,"
+ "Type=Producer,"
+ "destinationType=*,destinationName=*,"
+ "clientId=" + JMXSupport.encodeObjectNamePart(clientId) + ","
+ "producerId=*");
} catch (Throwable e) {
throw IOExceptionSupport.create(e);
}
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.broker.jmx;
import javax.management.ObjectName;
import org.apache.activemq.Service;
public interface ConnectionViewMBean extends Service {
@ -72,4 +74,21 @@ public interface ConnectionViewMBean extends Service {
*/
@MBeanInfo("User Name used to authorize creation of this connection")
String getUserName();
/**
* Returns the ObjectNames of all the Consumers created by this Connection.
*
* @return the ObjectNames of all Consumers created by this Connection.
*/
@MBeanInfo("The ObjectNames of all Consumers created by this Connection")
ObjectName[] getConsumers();
/**
* Returns the ObjectNames of all the Producers created by this Connection.
*
* @return the ObjectNames of all Producers created by this Connection.
*/
@MBeanInfo("The ObjectNames of all Producers created by this Connection")
ObjectName[] getProducers();
}

View File

@ -54,7 +54,7 @@ public class ManagedTransportConnection extends TransportConnection {
super(connector, transport, broker, factory);
this.managementContext = context;
this.connectorName = connectorName;
this.mbean = new ConnectionView(this);
this.mbean = new ConnectionView(this, managementContext);
this.populateUserName = broker.getBrokerService().isPopulateUserNameInMBeans();
if (managementContext.isAllowRemoteAddressInMBeanNames()) {
byAddressName = createByAddressObjectName("address", transport.getRemoteAddress());

View File

@ -900,8 +900,8 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
connection.setClientID("MBeanTest");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue(getDestinationString() + ".Queue");
@SuppressWarnings("unused")
MessageConsumer queueConsumer = session.createConsumer(queue);
MessageProducer producer = session.createProducer(queue);
ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
@ -911,6 +911,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertTrue(broker.getQueueSubscribers().length == 1);
ObjectName subscriptionName = broker.getQueueSubscribers()[0];
LOG.info("Looking for Subscription: " + subscriptionName);
SubscriptionViewMBean subscriberView =
(SubscriptionViewMBean)MBeanServerInvocationHandler.newProxyInstance(
@ -918,11 +919,37 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertNotNull(subscriberView);
ObjectName connectionName = subscriberView.getConnection();
LOG.info("Looking for Connection: " + connectionName);
assertNotNull(connectionName);
ConnectionViewMBean connectionView =
(ConnectionViewMBean)MBeanServerInvocationHandler.newProxyInstance(
mbeanServer, connectionName, ConnectionViewMBean.class, true);
assertNotNull(connectionView);
// Our consumer plus one advisory consumer.
assertEquals(2, connectionView.getConsumers().length);
// Check that the subscription view we found earlier is in this list.
boolean found = false;
for (ObjectName name : connectionView.getConsumers()) {
if (name.equals(subscriptionName)) {
found = true;
}
}
assertTrue("We should have found: " + subscriptionName, found);
// Our producer and no others.
assertEquals(1, connectionView.getProducers().length);
// Bean should detect the updates.
queueConsumer.close();
producer.close();
Thread.sleep(200);
// Only an advisory consumers now.
assertEquals(1, connectionView.getConsumers().length);
assertEquals(0, connectionView.getProducers().length);
}
public void testUserNamePopulated() throws Exception {