refactor the code to make it easy to support non-local brokers such as for connecting via JMX to a remote broker

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@504235 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2007-02-06 18:48:07 +00:00
parent 3b78804dbc
commit 8a5e197cb4
18 changed files with 208 additions and 163 deletions

View File

@ -17,126 +17,30 @@
*/
package org.apache.activemq.web;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.activemq.command.ActiveMQDestination;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
*
* A facade for either a local in JVM broker or a remote broker over JMX
*
* @version $Revision$
*/
public class BrokerFacade {
private static final Log log = LogFactory.getLog(BrokerFacade.class);
public interface BrokerFacade {
private BrokerService brokerService;
BrokerViewMBean getBrokerAdmin() throws Exception;
public BrokerFacade(BrokerService brokerService) {
this.brokerService = brokerService;
}
Collection getQueues() throws Exception;
public BrokerService getBrokerService() {
return brokerService;
}
Collection getTopics() throws Exception;
public Broker getBroker() throws Exception {
return brokerService.getBroker();
}
public ManagementContext getManagementContext() {
return brokerService.getManagementContext();
}
public BrokerViewMBean getBrokerAdmin() throws Exception {
// TODO could use JMX to look this up
return brokerService.getAdminView();
}
public ManagedRegionBroker getManagedBroker() throws Exception {
BrokerView adminView = brokerService.getAdminView();
if (adminView == null) {
return null;
}
return adminView.getBroker();
}
// TODO - we should not have to use JMX to implement the following methods...
/*
public Collection getQueues() throws Exception {
BrokerView broker = brokerService.getAdminView();
if (broker == null) {
return Collections.EMPTY_LIST;
}
ObjectName[] queues = broker.getQueues();
return getManagedObjects(queues, QueueViewMBean.class);
}
*/
public Collection getQueues() throws Exception {
ManagedRegionBroker broker = getManagedBroker();
if (broker == null) {
return new ArrayList();
}
return broker.getQueueRegion().getDestinationMap().values();
}
public Collection getTopics() throws Exception {
BrokerView broker = brokerService.getAdminView();
if (broker == null) {
return Collections.EMPTY_LIST;
}
ObjectName[] queues = broker.getTopics();
return getManagedObjects(queues, TopicViewMBean.class);
}
public Collection getDurableTopicSubscribers() throws Exception {
BrokerView broker = brokerService.getAdminView();
if (broker == null) {
return Collections.EMPTY_LIST;
}
ObjectName[] queues = broker.getDurableTopicSubscribers();
return getManagedObjects(queues, DurableSubscriptionViewMBean.class);
}
protected Collection getManagedObjects(ObjectName[] names, Class type) {
List answer = new ArrayList();
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
if (mbeanServer != null) {
for (int i = 0; i < names.length; i++) {
ObjectName name = names[i];
Object value = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, type, true);
if (value != null) {
answer.add(value);
}
}
}
return answer;
}
Collection getDurableTopicSubscribers() throws Exception;
/**
*
*
* public Collection getTopics() throws Exception { ManagedRegionBroker
* broker = getManagedBroker(); if (broker == null) { return new
* ArrayList(); } return
* broker.getTopicRegion().getDestinationMap().values(); }
* Purges the given destination
* @param destination
* @throws Exception
*/
void purgeQueue(ActiveMQDestination destination) throws Exception;
}

View File

@ -17,7 +17,7 @@
*/
package org.apache.activemq.web;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@ -29,16 +29,16 @@ import javax.servlet.http.HttpServletRequest;
*
* @version $Revision$
*/
public class DestinationFacade extends BrokerFacade {
public class DestinationFacade {
private String JMSDestination;
private String JMSDestinationType;
private BrokerFacade brokerFacade;
public DestinationFacade(BrokerService brokerService) {
super(brokerService);
public DestinationFacade(BrokerFacade brokerFacade) {
this.brokerFacade = brokerFacade;
}
public String toString() {
return super.toString() + "[destination:" + JMSDestination + "; type=" + JMSDestinationType + "]";
}
@ -67,6 +67,14 @@ public class DestinationFacade extends BrokerFacade {
// Properties
// -------------------------------------------------------------------------
public BrokerViewMBean getBrokerAdmin() throws Exception {
return brokerFacade.getBrokerAdmin();
}
public BrokerFacade getBrokerFacade() {
return brokerFacade;
}
public boolean isQueue() {
if (JMSDestinationType != null && JMSDestinationType.equalsIgnoreCase("topic")) {
return false;
@ -109,7 +117,7 @@ public class DestinationFacade extends BrokerFacade {
protected ModelAndView redirectToRequest(HttpServletRequest request) {
String view = "redirect:" + request.getRequestURI();
System.out.println("Redirecting to: " + view);
// System.out.println("Redirecting to: " + view);
return new ModelAndView(view);
}

View File

@ -17,8 +17,6 @@
*/
package org.apache.activemq.web;
import org.apache.activemq.broker.BrokerService;
/**
*
* @version $Revision$
@ -28,8 +26,8 @@ public class DurableSubscriberFacade extends DestinationFacade {
private String clientId;
private String subscriberName;
public DurableSubscriberFacade(BrokerService brokerService) {
super(brokerService);
public DurableSubscriberFacade(BrokerFacade brokerFacade) {
super(brokerFacade);
}
public String getClientId() {

View File

@ -0,0 +1,152 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* An implementation of {@link BrokerFacade} which uses a local in JVM broker
*
* @version $Revision$
*/
public class LocalBrokerFacade implements BrokerFacade {
private static final Log log = LogFactory.getLog(LocalBrokerFacade.class);
private BrokerService brokerService;
public LocalBrokerFacade(BrokerService brokerService) {
this.brokerService = brokerService;
}
public BrokerService getBrokerService() {
return brokerService;
}
public Broker getBroker() throws Exception {
return brokerService.getBroker();
}
public ManagementContext getManagementContext() {
return brokerService.getManagementContext();
}
public BrokerViewMBean getBrokerAdmin() throws Exception {
// TODO could use JMX to look this up
return brokerService.getAdminView();
}
public ManagedRegionBroker getManagedBroker() throws Exception {
BrokerView adminView = brokerService.getAdminView();
if (adminView == null) {
return null;
}
return adminView.getBroker();
}
// TODO - we should not have to use JMX to implement the following methods...
/*
public Collection getQueues() throws Exception {
BrokerView broker = brokerService.getAdminView();
if (broker == null) {
return Collections.EMPTY_LIST;
}
ObjectName[] queues = broker.getQueues();
return getManagedObjects(queues, QueueViewMBean.class);
}
*/
public Collection getQueues() throws Exception {
ManagedRegionBroker broker = getManagedBroker();
if (broker == null) {
return new ArrayList();
}
return broker.getQueueRegion().getDestinationMap().values();
}
public Collection getTopics() throws Exception {
BrokerView broker = brokerService.getAdminView();
if (broker == null) {
return Collections.EMPTY_LIST;
}
ObjectName[] queues = broker.getTopics();
return getManagedObjects(queues, TopicViewMBean.class);
}
public Collection getDurableTopicSubscribers() throws Exception {
BrokerView broker = brokerService.getAdminView();
if (broker == null) {
return Collections.EMPTY_LIST;
}
ObjectName[] queues = broker.getDurableTopicSubscribers();
return getManagedObjects(queues, DurableSubscriptionViewMBean.class);
}
protected Collection getManagedObjects(ObjectName[] names, Class type) {
List answer = new ArrayList();
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
if (mbeanServer != null) {
for (int i = 0; i < names.length; i++) {
ObjectName name = names[i];
Object value = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, type, true);
if (value != null) {
answer.add(value);
}
}
}
return answer;
}
/**
*
*
* public Collection getTopics() throws Exception { ManagedRegionBroker
* broker = getManagedBroker(); if (broker == null) { return new
* ArrayList(); } return
* broker.getTopicRegion().getDestinationMap().values(); }
*/
public void purgeQueue(ActiveMQDestination destination) throws Exception {
Set destinations = getManagedBroker().getQueueRegion().getDestinations(destination);
for (Iterator i=destinations.iterator(); i.hasNext();) {
Queue regionQueue = (Queue)i.next();
regionQueue.purge();
}
}
}

View File

@ -17,18 +17,14 @@
*/
package org.apache.activemq.web;
import org.apache.activemq.broker.BrokerService;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* Allow the user to browse a message on a queue by its ID
@ -40,8 +36,8 @@ public class MessageQuery extends QueueBrowseQuery {
private String id;
private Message message;
public MessageQuery(BrokerService brokerService, SessionPool sessionPool) throws JMSException {
super(brokerService, sessionPool);
public MessageQuery(BrokerFacade brokerFacade, SessionPool sessionPool) throws JMSException {
super(brokerFacade, sessionPool);
}
public String getId() {

View File

@ -17,7 +17,6 @@
*/
package org.apache.activemq.web;
import org.apache.activemq.broker.BrokerService;
import org.springframework.beans.factory.DisposableBean;
import javax.jms.JMSException;
@ -36,8 +35,8 @@ public class QueueBrowseQuery extends DestinationFacade implements DisposableBea
private Queue queue;
private QueueBrowser browser;
public QueueBrowseQuery(BrokerService brokerService, SessionPool sessionPool) throws JMSException {
super(brokerService);
public QueueBrowseQuery(BrokerFacade brokerFacade, SessionPool sessionPool) throws JMSException {
super(brokerFacade);
this.sessionPool = sessionPool;
this.session = sessionPool.borrowSession();
setJMSDestinationType("query");

View File

@ -22,7 +22,6 @@ import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import java.util.LinkedList;
/**

View File

@ -17,9 +17,8 @@
*/
package org.apache.activemq.web.controller;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.web.BrokerFacade;
import org.apache.activemq.web.DestinationFacade;
import org.apache.activemq.web.DurableSubscriberFacade;
import org.springframework.web.servlet.ModelAndView;
import org.springframework.web.servlet.mvc.Controller;
@ -32,8 +31,8 @@ import javax.servlet.http.HttpServletResponse;
*/
public class CreateDestination extends DestinationFacade implements Controller {
public CreateDestination(BrokerService brokerService) {
super(brokerService);
public CreateDestination(BrokerFacade brokerFacade) {
super(brokerFacade);
}
public ModelAndView handleRequest(HttpServletRequest request, HttpServletResponse response) throws Exception {

View File

@ -17,7 +17,7 @@
*/
package org.apache.activemq.web.controller;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.web.BrokerFacade;
import org.apache.activemq.web.DurableSubscriberFacade;
import org.springframework.web.servlet.ModelAndView;
import org.springframework.web.servlet.mvc.Controller;
@ -32,8 +32,8 @@ import javax.servlet.http.HttpServletResponse;
public class CreateSubscriber extends DurableSubscriberFacade implements Controller {
private String selector;
public CreateSubscriber(BrokerService brokerService) {
super(brokerService);
public CreateSubscriber(BrokerFacade brokerFacade) {
super(brokerFacade);
}
public String getSelector() {

View File

@ -17,9 +17,8 @@
*/
package org.apache.activemq.web.controller;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.web.BrokerFacade;
import org.apache.activemq.web.DestinationFacade;
import org.apache.activemq.web.DurableSubscriberFacade;
import org.springframework.web.servlet.ModelAndView;
import org.springframework.web.servlet.mvc.Controller;
@ -32,8 +31,8 @@ import javax.servlet.http.HttpServletResponse;
*/
public class DeleteDestination extends DestinationFacade implements Controller {
public DeleteDestination(BrokerService brokerService) {
super(brokerService);
public DeleteDestination(BrokerFacade brokerFacade) {
super(brokerFacade);
}
public ModelAndView handleRequest(HttpServletRequest request, HttpServletResponse response) throws Exception {

View File

@ -17,7 +17,7 @@
*/
package org.apache.activemq.web.controller;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.web.BrokerFacade;
import org.apache.activemq.web.DurableSubscriberFacade;
import org.springframework.web.servlet.ModelAndView;
import org.springframework.web.servlet.mvc.Controller;
@ -31,8 +31,8 @@ import javax.servlet.http.HttpServletResponse;
*/
public class DeleteSubscriber extends DurableSubscriberFacade implements Controller {
public DeleteSubscriber(BrokerService brokerService) {
super(brokerService);
public DeleteSubscriber(BrokerFacade brokerFacade) {
super(brokerFacade);
}
public ModelAndView handleRequest(HttpServletRequest request, HttpServletResponse response) throws Exception {

View File

@ -17,16 +17,13 @@
*/
package org.apache.activemq.web.controller;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.web.BrokerFacade;
import org.apache.activemq.web.DestinationFacade;
import org.springframework.web.servlet.ModelAndView;
import org.springframework.web.servlet.mvc.Controller;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Set;
import java.util.Iterator;
/**
*
@ -34,8 +31,8 @@ import java.util.Iterator;
*/
public class PurgeDestination extends DestinationFacade implements Controller {
public PurgeDestination(BrokerService brokerService) {
super(brokerService);
public PurgeDestination(BrokerFacade brokerFacade) {
super(brokerFacade);
}
public ModelAndView handleRequest(HttpServletRequest request, HttpServletResponse response) throws Exception {
@ -45,11 +42,7 @@ public class PurgeDestination extends DestinationFacade implements Controller {
public void purgeDestination() throws Exception {
if (isQueue()) {
Set destinations = getManagedBroker().getQueueRegion().getDestinations(createDestination());
for (Iterator i=destinations.iterator(); i.hasNext();) {
Queue regionQueue = (Queue)i.next();
regionQueue.purge();
}
getBrokerFacade().purgeQueue(createDestination());
}
else {
throw new UnsupportedOperationException("Purge supported for queues only. Receieved JMSDestinationType=" + getJMSDestinationType());

View File

@ -17,8 +17,8 @@
*/
package org.apache.activemq.web.controller;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.web.BrokerFacade;
import org.apache.activemq.web.DestinationFacade;
import org.apache.activemq.web.WebClient;
import org.springframework.web.servlet.ModelAndView;
@ -28,7 +28,6 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Iterator;
import java.util.Map;
@ -50,8 +49,8 @@ public class SendMessage extends DestinationFacade implements Controller {
private String JMSMessageCountHeader = "JMSXMessageNumber";
private boolean redirectToBrowse;
public SendMessage(BrokerService brokerService) {
super(brokerService);
public SendMessage(BrokerFacade brokerFacade) {
super(brokerFacade);
}
public ModelAndView handleRequest(HttpServletRequest request, HttpServletResponse response) throws Exception {

View File

@ -28,7 +28,6 @@ import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.Collections;

View File

@ -17,10 +17,10 @@
*/
package org.apache.activemq.web.handler;
import org.springframework.web.bind.ServletRequestDataBinder;
import org.springframework.web.servlet.handler.BeanNameUrlHandlerMapping;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.web.bind.ServletRequestDataBinder;
import org.springframework.web.servlet.handler.BeanNameUrlHandlerMapping;
import javax.servlet.http.HttpServletRequest;

View File

@ -27,7 +27,7 @@
<bean id="sessionPool" class="org.apache.activemq.web.SessionPool"/>
<bean id="brokerQuery" class="org.apache.activemq.web.BrokerFacade" autowire='constructor' singleton="false"/>
<bean id="brokerQuery" class="org.apache.activemq.web.LocalBrokerFacade" autowire='constructor' singleton="false"/>
<bean id="queueBrowser" class="org.apache.activemq.web.QueueBrowseQuery" autowire='constructor' singleton="false"/>
<bean id="messageQuery" class="org.apache.activemq.web.MessageQuery" autowire='constructor' singleton="false"/>

View File

@ -88,7 +88,7 @@
<div id="wrapper-footer">
<div id="footer">
<p>
Copyright 2005-2006 The Apache Software Foundation
Copyright 2005-2007 The Apache Software Foundation
</p>
<p>

View File

@ -42,7 +42,7 @@ function drawGraph() {
var layout = new PlotKit.Layout("bar", options);
layout.addDataset("sqrt", [<c:forEach items="${requestContext.brokerQuery.queues}" var="row" varStatus="status"><c:if
test="${status.count > 1}">, </c:if> [${status.count}, ${row.queueSize}] </c:forEach> ]);
test="${status.count > 1}">, </c:if> [${status.count}, ${row.destinationStatistics.messages.count}] </c:forEach> ]);
layout.evaluate();
var canvas = MochiKit.DOM.getElement("graph");