moved the web helper classes to activemq-web so that they are reusable in other web apps easily

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@569067 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2007-08-23 16:42:34 +00:00
parent d97054b1c2
commit 5a5056e73b
10 changed files with 535 additions and 0 deletions

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.TopicViewMBean;
/**
* A useful base class for an implementation of {@link BrokerFacade}
*
* @version $Revision$
*/
public abstract class BrokerFacadeSupport implements BrokerFacade {
public abstract ManagementContext getManagementContext();
public Collection<Object> getQueues() throws Exception {
BrokerViewMBean broker = getBrokerAdmin();
if (broker == null) {
return Collections.EMPTY_LIST;
}
ObjectName[] queues = broker.getQueues();
return getManagedObjects(queues, QueueViewMBean.class);
}
public Collection<Object> getTopics() throws Exception {
BrokerViewMBean broker = getBrokerAdmin();
if (broker == null) {
return Collections.EMPTY_LIST;
}
ObjectName[] queues = broker.getTopics();
return getManagedObjects(queues, TopicViewMBean.class);
}
public Collection<Object> getDurableTopicSubscribers() throws Exception {
BrokerViewMBean broker = getBrokerAdmin();
if (broker == null) {
return Collections.EMPTY_LIST;
}
ObjectName[] queues = broker.getDurableTopicSubscribers();
return getManagedObjects(queues, DurableSubscriptionViewMBean.class);
}
public QueueViewMBean getQueue(String name) throws Exception {
return (QueueViewMBean) getDestinationByName(getQueues(), name);
}
public TopicViewMBean getTopic(String name) throws Exception {
return (TopicViewMBean) getDestinationByName(getTopics(), name);
}
protected DestinationViewMBean getDestinationByName(Collection<Object> collection, String name) {
Iterator<Object> iter = collection.iterator();
while (iter.hasNext()) {
DestinationViewMBean destinationViewMBean = (DestinationViewMBean) iter.next();
if (name.equals(destinationViewMBean.getName())) {
return destinationViewMBean;
}
}
return null;
}
protected Collection<Object> getManagedObjects(ObjectName[] names, Class type) {
List<Object> answer = new ArrayList<Object>();
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
if (mbeanServer != null) {
for (int i = 0; i < names.length; i++) {
ObjectName name = names[i];
Object value = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, type, true);
if (value != null) {
answer.add(value);
}
}
}
return answer;
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web;
/**
*
* @version $Revision$
*/
public class DurableSubscriberFacade extends DestinationFacade {
private String clientId;
private String subscriberName;
public DurableSubscriberFacade(BrokerFacade brokerFacade) {
super(brokerFacade);
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getSubscriberName() {
return subscriberName;
}
public void setSubscriberName(String subscriberName) {
this.subscriberName = subscriberName;
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web;
import java.util.Iterator;
import java.util.Set;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQDestination;
/**
* An implementation of {@link BrokerFacade} which uses a local in JVM broker
*
* @version $Revision$
*/
public class LocalBrokerFacade extends BrokerFacadeSupport {
private BrokerService brokerService;
public LocalBrokerFacade(BrokerService brokerService) {
this.brokerService = brokerService;
}
public BrokerService getBrokerService() {
return brokerService;
}
public Broker getBroker() throws Exception {
return brokerService.getBroker();
}
public ManagementContext getManagementContext() {
return brokerService.getManagementContext();
}
public BrokerViewMBean getBrokerAdmin() throws Exception {
// TODO could use JMX to look this up
return brokerService.getAdminView();
}
public ManagedRegionBroker getManagedBroker() throws Exception {
BrokerView adminView = brokerService.getAdminView();
if (adminView == null) {
return null;
}
return adminView.getBroker();
}
public void purgeQueue(ActiveMQDestination destination) throws Exception {
Set destinations = getManagedBroker().getQueueRegion().getDestinations(destination);
for (Iterator i = destinations.iterator(); i.hasNext();) {
Queue regionQueue = (Queue)i.next();
regionQueue.purge();
}
}
}

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
/**
* Allow the user to browse a message on a queue by its ID
*
* @version $Revision$
*/
public class MessageQuery extends QueueBrowseQuery {
private String id;
private Message message;
public MessageQuery(BrokerFacade brokerFacade, SessionPool sessionPool) throws JMSException {
super(brokerFacade, sessionPool);
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public void setMessage(Message message) {
this.message = message;
}
public Message getMessage() throws JMSException {
if (message == null) {
if (id != null) {
Enumeration iter = getBrowser().getEnumeration();
while (iter.hasMoreElements()) {
Message item = (Message) iter.nextElement();
if (id.equals(item.getJMSMessageID())) {
message = item;
break;
}
}
}
}
return message;
}
public Object getBody() throws JMSException {
Message message = getMessage();
if (message instanceof TextMessage) {
return ((TextMessage) message).getText();
}
if (message instanceof ObjectMessage) {
return ((ObjectMessage) message).getObject();
}
if (message instanceof MapMessage) {
return createMapBody((MapMessage) message);
}
return null;
}
public Map<String, Object> getPropertiesMap() throws JMSException {
Map<String, Object> answer = new HashMap<String, Object>();
Message aMessage = getMessage();
Enumeration iter = aMessage.getPropertyNames();
while (iter.hasMoreElements()) {
String name = (String) iter.nextElement();
Object value = aMessage.getObjectProperty(name);
if (value != null) {
answer.put(name, value);
}
}
return answer;
}
protected Map<String, Object> createMapBody(MapMessage mapMessage) throws JMSException {
Map<String, Object> answer = new HashMap<String, Object>();
Enumeration iter = mapMessage.getMapNames();
while (iter.hasMoreElements()) {
String name = (String) iter.nextElement();
Object value = mapMessage.getObject(name);
if (value != null) {
answer.put(name, value);
}
}
return answer;
}
}

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import org.springframework.beans.factory.DisposableBean;
/**
*
* @version $Revision$
*/
public class QueueBrowseQuery extends DestinationFacade implements DisposableBean {
private SessionPool sessionPool;
private String selector;
private Session session;
private Queue queue;
private QueueBrowser browser;
public QueueBrowseQuery(BrokerFacade brokerFacade, SessionPool sessionPool) throws JMSException {
super(brokerFacade);
this.sessionPool = sessionPool;
this.session = sessionPool.borrowSession();
setJMSDestinationType("query");
}
public void destroy() throws Exception {
if (browser != null) {
browser.close();
}
sessionPool.returnSession(session);
session = null;
}
public QueueBrowser getBrowser() throws JMSException {
if (browser == null) {
browser = createBrowser();
}
return browser;
}
public void setBrowser(QueueBrowser browser) {
this.browser = browser;
}
public Queue getQueue() throws JMSException {
if (queue == null) {
queue = session.createQueue(getValidDestination());
}
return queue;
}
public void setQueue(Queue queue) {
this.queue = queue;
}
public String getSelector() {
return selector;
}
public void setSelector(String selector) {
this.selector = selector;
}
public Session getSession() {
return session;
}
public boolean isQueue() {
return true;
}
protected QueueBrowser createBrowser() throws JMSException {
return getSession().createBrowser(getQueue(), getSelector());
}
}

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web;
import java.util.LinkedList;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
/**
* A simple pool of JMS Session objects intended for use by Queue browsers.
*
* @version $Revision$
*/
public class SessionPool {
private ConnectionFactory connectionFactory;
private Connection connection;
private LinkedList<Session> sessions = new LinkedList<Session>();
public Connection getConnection() throws JMSException {
if (checkConnection()) {
return connection;
}
synchronized (this) {
connection = getConnectionFactory().createConnection();
connection.start();
return connection;
}
}
private boolean checkConnection() {
if (connection == null) {
return false;
}
try {
connection.getMetaData();
return true;
} catch (JMSException e) {
return false;
}
}
public void setConnection(Connection connection) {
this.connection = connection;
}
public ConnectionFactory getConnectionFactory() {
if (connectionFactory == null) {
throw new IllegalStateException("No ConnectionFactory has been set for the session pool");
}
return connectionFactory;
}
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public Session borrowSession() throws JMSException {
Session answer = null;
synchronized (sessions) {
if (sessions.isEmpty()) {
answer = createSession();
} else {
answer = sessions.removeLast();
}
}
return answer;
}
public void returnSession(Session session) {
if (session != null) {
synchronized (sessions) {
sessions.add(session);
}
}
}
protected Session createSession() throws JMSException {
return getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
}
}