AMQ-5104: Add non-durable subscribers to the subscriber page, and topic-specific subscriber view for individual topics.

This commit is contained in:
artnaseef 2014-03-14 23:10:13 -07:00
parent db1e3fc9e3
commit ae2504ab8e
7 changed files with 230 additions and 8 deletions

View File

@ -23,5 +23,6 @@
<bean id="queueConsumerQuery" class="org.apache.activemq.web.QueueConsumerQuery" autowire="constructor" destroy-method="destroy" scope="request"/>
<bean id="queueProducerQuery" class="org.apache.activemq.web.QueueProducerQuery" autowire="constructor" destroy-method="destroy" scope="request"/>
<bean id="topicProducerQuery" class="org.apache.activemq.web.TopicProducerQuery" autowire="constructor" destroy-method="destroy" scope="request"/>
<bean id="topicSubscriberQuery" class="org.apache.activemq.web.TopicSubscriberQuery" autowire="constructor" destroy-method="destroy" scope="request"/>
<bean id="connectionQuery" class="org.apache.activemq.web.ConnectionQuery" autowire="constructor" destroy-method="destroy" scope="request"/>
</beans>

View File

@ -96,7 +96,11 @@
<tbody>
<c:forEach items="${requestContext.brokerQuery.durableTopicSubscribers}" var="row">
<tr>
<td><form:tooltip text="${row.clientId}" length="10"/></td>
<td>
<a href="<c:url value="connection.jsp?connectionID=${row.clientId}"/>">
<form:tooltip text="${row.clientId}" length="10"/>
</a>
</td>
<td><form:tooltip text="${row.subscriptionName}" length="10"/></td>
<td><form:tooltip text="${row.connectionId}" length="10"/></td>
<td><form:tooltip text="${row.destinationName}" length="10"/></td>
@ -140,7 +144,11 @@
<tbody>
<c:forEach items="${requestContext.brokerQuery.inactiveDurableTopicSubscribers}" var="row">
<tr>
<td><form:tooltip text="${row.clientId}" length="10"/></td>
<td>
<a href="<c:url value="connection.jsp?connectionID=${row.clientId}"/>">
<form:tooltip text="${row.clientId}" length="10"/>
</a>
</td>
<td><form:tooltip text="${row.subscriptionName}" length="10"/></td>
<td><form:tooltip text="${row.connectionId}" length="10"/></td>
<td><form:tooltip text="${row.destinationName}" length="10"/></td>
@ -162,6 +170,52 @@
</tbody>
</table>
<h2>Active Non-Durable Topic Subscribers</h2>
<table id="topics" class="sortable autostripe">
<thead>
<tr>
<th>Client ID</th>
<th>Subscription Name</th>
<th>Connection ID</th>
<th>Destination</th>
<th>Selector</th>
<th>Pending Queue Size</th>
<th>Dispatched Queue Size</th>
<th>Dispatched Counter</th>
<th>Enqueue Counter</th>
<th>Dequeue Counter</th>
<th>Operations</th>
</tr>
</thead>
<tbody>
<c:forEach items="${requestContext.brokerQuery.nonDurableTopicSubscribers}" var="row">
<tr>
<td>
<a href="<c:url value="connection.jsp?connectionID=${row.clientId}"/>">
<form:tooltip text="${row.clientId}" length="10"/>
</a>
</td>
<td><form:tooltip text="${row.subscriptionName}" length="10"/></td>
<td><form:tooltip text="${row.connectionId}" length="10"/></td>
<td><form:tooltip text="${row.destinationName}" length="10"/></td>
<td><c:out value="${row.selector}"/></td>
<td><c:out value="${row.pendingQueueSize}" /></td>
<td><c:out value="${row.dispatchedQueueSize}" /></td>
<td><c:out value="${row.dispatchedCounter}" /></td>
<td><c:out value="${row.enqueueCounter}" /></td>
<td><c:out value="${row.dequeueCounter}" /></td>
<td>
</td>
</tr>
</c:forEach>
</tbody>
</table>
<%@include file="decorators/footer.jsp" %>
</body>

View File

@ -0,0 +1,78 @@
<%--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
--%>
<html>
<head>
<c:set var="pageTitle" value="Subscribers for Topic ${requestContext.topicSubscriberQuery.JMSDestination}"/>
<%@include file="decorators/head.jsp" %>
</head>
<body>
<%@include file="decorators/header.jsp" %>
<h2>Active Subscribers for <c:out value="${requestContext.topicSubscriberQuery.JMSDestination}" /></h2>
<table id="producers" class="sortable autostripe">
<thead>
<tr>
<th>
<span>Client ID</span>
<br/>
<span>Connection ID</span>
</th>
<th>SessionId</th>
<th>SubscriptionId</th>
<th>Selector</th>
<th>Active</th>
<th>Network</th>
<th>Pending Queue Size</th>
<th>Inflight</th>
<th>Enqueued</th>
<th>Dequeued</th>
<th>Prefetch</th>
<th>Subscription Name</th>
</tr>
</thead>
<tbody>
<c:forEach items="${requestContext.topicSubscriberQuery.subscribers}" var="row">
<tr>
<td>
<a href="<c:url value="connection.jsp?connectionID=${row.clientId}"/>"><c:out value="${row.clientId}" /></a><br/>
<br>
<c:out value="${row.connectionId}" />
</td>
<td><c:out value="${row.sessionId}" /></td>
<td><c:out value="${row.subscriptionId}" /></td>
<td><c:out value="${row.selector}" /></td>
<td><c:out value="${row.active}" /></td>
<td><c:out value="${row.network}" /></td>
<td><c:out value="${row.pendingQueueSize}" /></td>
<td><c:out value="${row.messageCountAwaitingAcknowledge}" /></td>
<td><c:out value="${row.enqueueCounter}" /></td>
<td><c:out value="${row.dequeueCounter}" /></td>
<td><c:out value="${row.prefetchSize}" /></td>
<td><c:out value="${row.subscriptionName}" /></td>
</tr>
</c:forEach>
</tbody>
</table>
<%@include file="decorators/footer.jsp" %>
</body>
</html>

View File

@ -61,6 +61,8 @@
<a href="<c:url value="send.jsp">
<c:param name="JMSDestination" value="${row.name}" />
<c:param name="JMSDestinationType" value="topic"/></c:url>">Send To</a>
<a href="<c:url value="topicSubscribers.jsp">
<c:param name="JMSDestination" value="${row.name}" /></c:url>">Active Subscribers</a><br/>
<a href="<c:url value="topicProducers.jsp">
<c:param name="JMSDestination" value="${row.name}" /></c:url>">Active Producers</a><br/>
<a href="<c:url value="deleteDestination.action">

View File

@ -94,6 +94,28 @@ public interface BrokerFacade {
Collection<ProducerViewMBean> getTopicProducers(String queueName)
throws Exception;
/**
* All active non-durable subscribers to a topic.
*
* @param queueName
* the name of the topic, not <code>null</code>
* @return not <code>null</code>
* @throws Exception
*/
public Collection<SubscriptionViewMBean> getTopicSubscribers(String topicName)
throws Exception;
/**
* All active non-durable subscribers to a topic.
*
* @param queueName
* the name of the topic, not <code>null</code>
* @return not <code>null</code>
* @throws Exception
*/
public Collection<SubscriptionViewMBean> getNonDurableTopicSubscribers()
throws Exception;
/**
* Active durable subscribers to topics of the broker.
*

View File

@ -69,8 +69,28 @@ public abstract class BrokerFacadeSupport implements BrokerFacade {
if (broker == null) {
return Collections.EMPTY_LIST;
}
ObjectName[] queues = broker.getTopics();
return getManagedObjects(queues, TopicViewMBean.class);
ObjectName[] topics = broker.getTopics();
return getManagedObjects(topics, TopicViewMBean.class);
}
@Override
public Collection<SubscriptionViewMBean> getTopicSubscribers(String topicName) throws Exception {
String brokerName = getBrokerName();
topicName = StringUtils.replace(topicName, "\"", "_");
ObjectName query = new ObjectName("org.apache.activemq:type=Broker,brokerName=" + brokerName
+ ",destinationType=Topic,destinationName=" + topicName + ",endpoint=Consumer,*");
Set<ObjectName> queryResult = queryNames(query, null);
return getManagedObjects(queryResult.toArray(new ObjectName[queryResult.size()]), SubscriptionViewMBean.class);
}
@Override
public Collection<SubscriptionViewMBean> getNonDurableTopicSubscribers() throws Exception {
BrokerViewMBean broker = getBrokerAdmin();
if (broker == null) {
return Collections.EMPTY_LIST;
}
ObjectName[] subscribers = broker.getTopicSubscribers();
return getManagedObjects(subscribers, SubscriptionViewMBean.class);
}
@Override
@ -79,8 +99,8 @@ public abstract class BrokerFacadeSupport implements BrokerFacade {
if (broker == null) {
return Collections.EMPTY_LIST;
}
ObjectName[] queues = broker.getDurableTopicSubscribers();
return getManagedObjects(queues, DurableSubscriptionViewMBean.class);
ObjectName[] subscribers = broker.getDurableTopicSubscribers();
return getManagedObjects(subscribers, DurableSubscriptionViewMBean.class);
}
@Override
@ -89,8 +109,8 @@ public abstract class BrokerFacadeSupport implements BrokerFacade {
if (broker == null) {
return Collections.EMPTY_LIST;
}
ObjectName[] queues = broker.getInactiveDurableTopicSubscribers();
return getManagedObjects(queues, DurableSubscriptionViewMBean.class);
ObjectName[] subscribers = broker.getInactiveDurableTopicSubscribers();
return getManagedObjects(subscribers, DurableSubscriptionViewMBean.class);
}
@Override

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web;
import java.util.Collection;
import javax.jms.JMSException;
import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
/**
* Query for Topic producers.
*
*
*/
public class TopicSubscriberQuery extends DestinationFacade {
public TopicSubscriberQuery(BrokerFacade brokerFacade) throws JMSException {
super(brokerFacade);
setJMSDestinationType("queue");
}
public Collection<SubscriptionViewMBean> getSubscribers() throws Exception {
return getBrokerFacade().getTopicSubscribers(getJMSDestination());
}
public void destroy() {
// empty
}
}