added support for RESTful browsing of message queues using a web container; either as XML or as RSS/Atom feeds. Fixes AMQ-666 - crikey thats a bad number for an AMQ issue :)

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@389482 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-03-28 11:57:51 +00:00
parent 3263205bf9
commit d754e76c50
15 changed files with 669 additions and 2 deletions

View File

@ -74,6 +74,9 @@
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>${derby_version}</version>
<properties>
<war.bundle>true</war.bundle>
</properties>
</dependency>
<!-- For Spring servlet -->
@ -91,6 +94,41 @@
<groupId>${pom.groupId}</groupId>
<artifactId>activeio-core</artifactId>
<version>${activeio_version}</version>
<properties>
<war.bundle>true</war.bundle>
</properties>
</dependency>
<!-- Rome RSS Reader -->
<dependency>
<id>rome</id>
<version>${rome_version}</version>
<properties>
<war.bundle>true</war.bundle>
</properties>
</dependency>
<dependency>
<id>jdom</id>
<version>${jdom_version}</version>
<properties>
<war.bundle>true</war.bundle>
</properties>
</dependency>
<dependency>
<groupId>xstream</groupId>
<artifactId>xstream</artifactId>
<version>${xstream_version}</version>
<properties>
<war.bundle>true</war.bundle>
</properties>
</dependency>
<dependency>
<groupId>xmlpull</groupId>
<artifactId>xmlpull</artifactId>
<version>${xmlpull_version}</version>
</dependency>
<!-- optional used for in-web container testing -->

View File

@ -185,11 +185,30 @@ public class MessageListenerServlet extends MessageServletSupport {
}
else
{
// lets assume a simple POST of a message
/*
response.setContentType("text/xml");
response.setHeader("Cache-Control", "no-cache");
response.getWriter().print("<ajax-response></ajax-response>");
*/
try {
Destination destination=getDestination(client, request);
String body = getPostedMessageBody(request);
TextMessage message = client.getSession().createTextMessage(body );
client.send(destination, message);
if (log.isDebugEnabled()) {
log.debug("Sent to destination: " + destination + " body: " + body);
}
response.setContentType("text");
response.setHeader("Cache-Control", "no-cache");
response.getWriter().print(message.getJMSMessageID());
}
catch (JMSException e) {
throw new ServletException(e);
}
}
// System.err.println("==");
}
/**

View File

@ -0,0 +1,38 @@
/*
* Copyright 2005-2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web;
import javax.servlet.ServletException;
/**
*
* @version $Revision: $
*/
public class NoSuchViewStyleException extends ServletException {
private static final long serialVersionUID = -3590398087507019767L;
private final String style;
public NoSuchViewStyleException(String style, Throwable cause) {
super("The view style '" + style + "' could not be created", cause);
this.style = style;
}
public String getStyle() {
return style;
}
}

View File

@ -0,0 +1,182 @@
/*
* Copyright 2005-2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web;
import org.apache.activeio.util.FactoryFinder;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.web.view.MessageRenderer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
/**
* Renders the contents of a queue using some kind of view. The URI is assumed
* to be the queue. The following parameters can be used
*
* <ul>
* <li>view - specifies the type of the view such as simple, xml, rss</li>
* <li>selector - specifies the SQL 92 selector to apply to the queue</li>
* </ul>
*
* @version $Revision: $
*/
public class QueueBrowseServlet extends HttpServlet {
private static FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/apache/activemq/web/view/");
private ConnectionFactory connectionFactory;
private Connection connection;
private LinkedList sessions = new LinkedList();
public Connection getConnection() throws JMSException {
if (connection == null) {
connection = getConnectionFactory().createConnection();
connection.start();
}
return connection;
}
public void setConnection(Connection connection) {
this.connection = connection;
}
public ConnectionFactory getConnectionFactory() {
if (connectionFactory == null) {
// TODO support remote brokers too
connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
}
return connectionFactory;
}
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
// Implementation methods
// -------------------------------------------------------------------------
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
Session session = null;
try {
session = borrowSession();
Queue queue = getQueue(request, session);
if (queue == null) {
throw new ServletException("No queue URI specified");
}
String selector = getSelector(request);
QueueBrowser browser = session.createBrowser(queue, selector);
MessageRenderer renderer = getMessageRenderer(request);
configureRenderer(request, renderer);
renderer.renderMessages(request, response, browser);
}
catch (JMSException e) {
throw new ServletException(e);
}
finally {
returnSession(session);
}
}
protected MessageRenderer getMessageRenderer(HttpServletRequest request) throws IOException, ServletException {
String style = request.getParameter("view");
if (style == null) {
style = "simple";
}
try {
return (MessageRenderer) factoryFinder.newInstance(style);
}
catch (IllegalAccessException e) {
throw new NoSuchViewStyleException(style, e);
}
catch (InstantiationException e) {
throw new NoSuchViewStyleException(style, e);
}
catch (ClassNotFoundException e) {
throw new NoSuchViewStyleException(style, e);
}
}
protected void configureRenderer(HttpServletRequest request, MessageRenderer renderer) {
Map properties = new HashMap();
for (Enumeration iter = request.getParameterNames(); iter.hasMoreElements(); ) {
String name = (String) iter.nextElement();
properties.put(name, request.getParameter(name));
}
IntrospectionSupport.setProperties(renderer, properties);
}
protected Session borrowSession() throws JMSException {
Session answer = null;
synchronized (sessions) {
if (sessions.isEmpty()) {
answer = createSession();
}
else {
answer = (Session) sessions.removeLast();
}
}
return answer;
}
protected void returnSession(Session session) {
if (session != null) {
synchronized (sessions) {
sessions.add(session);
}
}
}
protected Session createSession() throws JMSException {
return getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
}
protected String getSelector(HttpServletRequest request) {
return request.getParameter("selector");
}
protected Queue getQueue(HttpServletRequest request, Session session) throws JMSException {
String uri = request.getPathInfo();
if (uri == null)
return null;
// replace URI separator with JMS destination separator
if (uri.startsWith("/")) {
uri = uri.substring(1);
if (uri.length() == 0)
return null;
}
uri = uri.replace('/', '.');
System.out.println("destination uri = " + uri);
return session.createQueue(uri);
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright 2005-2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web.view;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.QueueBrowser;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
/**
* Represents a rendering of the messages
*
* @version $Revision: $
*/
public interface MessageRenderer {
public void renderMessages(HttpServletRequest request, HttpServletResponse response, QueueBrowser browser)
throws IOException, JMSException, ServletException;
public void renderMessage(PrintWriter writer, HttpServletRequest request, HttpServletResponse response,
QueueBrowser browser, Message message) throws JMSException, ServletException;
}

View File

@ -0,0 +1,153 @@
/*
* Copyright 2005-2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web.view;
import com.sun.syndication.feed.synd.SyndContent;
import com.sun.syndication.feed.synd.SyndContentImpl;
import com.sun.syndication.feed.synd.SyndEntry;
import com.sun.syndication.feed.synd.SyndEntryImpl;
import com.sun.syndication.feed.synd.SyndFeed;
import com.sun.syndication.feed.synd.SyndFeedImpl;
import com.sun.syndication.io.FeedException;
import com.sun.syndication.io.SyndFeedOutput;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.QueueBrowser;
import javax.jms.TextMessage;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Date;
import java.util.List;
/**
* This renderer uses XStream to render messages on a queue as full XML elements
*
* @version $Revision: $
*/
public class RssMessageRenderer extends SimpleMessageRenderer {
//private String feedType = "atom_0.3";
private String feedType = "rss_2.0";
private SyndFeed feed;
private String description = "This feed is auto-generated by Apache ActiveMQ";
private String entryContentType = "text/plain";
public void renderMessage(PrintWriter writer, HttpServletRequest request, HttpServletResponse response,
QueueBrowser browser, Message message) throws JMSException {
SyndFeed feed = getFeed(browser, request);
List entries = feed.getEntries();
SyndEntry entry = createEntry(browser, message, request);
SyndContent description = createEntryContent(browser, message, request);
entry.setDescription(description);
entries.add(entry);
}
// Properties
// -------------------------------------------------------------------------
public String getDescription() {
return description;
}
public void setDescription(String feedDescription) {
this.description = feedDescription;
}
public String getFeedType() {
return feedType;
}
public void setFeedType(String feedType) {
this.feedType = feedType;
}
public String getEntryContentType() {
return entryContentType;
}
public void setEntryContentType(String entryContentType) {
this.entryContentType = entryContentType;
}
// Implementation methods
// -------------------------------------------------------------------------
protected void printFooter(PrintWriter writer, QueueBrowser browser, HttpServletRequest request)
throws IOException, JMSException, ServletException {
// now lets actually write out the content
SyndFeed feed = getFeed(browser, request);
SyndFeedOutput output = new SyndFeedOutput();
try {
output.output(feed, writer);
}
catch (FeedException e) {
throw new ServletException(e);
}
}
protected void printHeader(PrintWriter writer, QueueBrowser browser, HttpServletRequest request)
throws IOException, JMSException {
}
public SyndFeed getFeed(QueueBrowser browser, HttpServletRequest request) throws JMSException {
if (feed == null) {
feed = createFeed(browser, request);
}
return feed;
}
protected SyndEntry createEntry(QueueBrowser browser, Message message, HttpServletRequest request) throws JMSException {
SyndEntry entry = new SyndEntryImpl();
String title = message.getJMSMessageID();
entry.setTitle(title);
String link = request.getRequestURI() + "/" + title;
entry.setLink(link);
entry.setPublishedDate(new Date());
return entry;
}
protected SyndContent createEntryContent(QueueBrowser browser, Message message, HttpServletRequest request) throws JMSException {
SyndContent description = new SyndContentImpl();
description.setType(entryContentType);
if (message instanceof TextMessage) {
String text = ((TextMessage) message).getText();
description.setValue(text);
}
return description;
}
protected SyndFeed createFeed(QueueBrowser browser, HttpServletRequest request) throws JMSException {
SyndFeed feed = new SyndFeedImpl();
feed.setFeedType(feedType);
String title = browser.getQueue().toString();
String selector = browser.getMessageSelector();
if (selector != null) {
title += " with selector: " + selector;
}
feed.setTitle(title);
feed.setLink(request.getRequestURI());
feed.setDescription(getDescription());
return feed;
}
}

View File

@ -0,0 +1,105 @@
/*
* Copyright 2005-2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web.view;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.QueueBrowser;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Enumeration;
/**
* A simple rendering of the contents of a queue appear as a list of message
* elements which just contain an ID attribute.
*
* @version $Revision: $
*/
public class SimpleMessageRenderer implements MessageRenderer {
private String contentType = "text/xml";
private int maxMessages = 0;
public void renderMessages(HttpServletRequest request, HttpServletResponse response, QueueBrowser browser)
throws IOException, JMSException, ServletException {
// lets use XML by default
response.setContentType(getContentType());
PrintWriter writer = response.getWriter();
printHeader(writer, browser, request);
Enumeration iter = browser.getEnumeration();
for (int counter = 0; iter.hasMoreElements() && (maxMessages <= 0 || counter < maxMessages); counter++) {
Message message = (Message) iter.nextElement();
renderMessage(writer, request, response, browser, message);
}
printFooter(writer, browser, request);
}
public void renderMessage(PrintWriter writer, HttpServletRequest request, HttpServletResponse response,
QueueBrowser browser, Message message) throws JMSException, ServletException {
// lets just write the message IDs for now
writer.print("<message id='");
writer.print(message.getJMSMessageID());
writer.println("'/>");
}
// Properties
// -------------------------------------------------------------------------
public int getMaxMessages() {
return maxMessages;
}
public void setMaxMessages(int maxMessages) {
this.maxMessages = maxMessages;
}
public String getContentType() {
return contentType;
}
public void setContentType(String contentType) {
this.contentType = contentType;
}
// Implementation methods
// -------------------------------------------------------------------------
protected void printHeader(PrintWriter writer, QueueBrowser browser, HttpServletRequest request)
throws IOException, JMSException, ServletException {
writer.println("");
writer.print("<messages queue='");
writer.print(browser.getQueue());
writer.print("'");
String selector = browser.getMessageSelector();
if (selector != null) {
writer.print(" selector='");
writer.print(selector);
writer.print("'");
}
writer.println(">");
}
protected void printFooter(PrintWriter writer, QueueBrowser browser, HttpServletRequest request)
throws IOException, JMSException, ServletException {
writer.println("</messages>");
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright 2005-2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web.view;
import com.thoughtworks.xstream.XStream;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.QueueBrowser;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.PrintWriter;
/**
* This renderer uses XStream to render messages on a queue as full XML elements
*
* @version $Revision: $
*/
public class XmlMessageRenderer extends SimpleMessageRenderer {
private XStream xstream;
public void renderMessage(PrintWriter writer, HttpServletRequest request, HttpServletResponse response, QueueBrowser browser, Message message) throws JMSException {
getXstream().toXML(message, writer);
}
public XStream getXstream() {
if (xstream == null) {
xstream = new XStream();
}
return xstream;
}
public void setXstream(XStream xstream) {
this.xstream = xstream;
}
}

View File

@ -0,0 +1 @@
class=org.apache.activemq.web.view.RssMessageRenderer

View File

@ -0,0 +1 @@
class=org.apache.activemq.web.view.SimpleMessageRenderer

View File

@ -0,0 +1 @@
class=org.apache.activemq.web.view.XmlMessageRenderer

View File

@ -53,6 +53,12 @@
<load-on-startup>1</load-on-startup>
</servlet>
<!-- the queue browse servlet -->
<servlet>
<servlet-name>QueueBrowseServlet</servlet-name>
<servlet-class>org.apache.activemq.web.QueueBrowseServlet</servlet-class>
</servlet>
<!-- servlets for the portfolio demo -->
<servlet>
<servlet-name>PortfolioPublishServlet</servlet-name>
@ -65,6 +71,11 @@
<url-pattern>/amq/*</url-pattern>
</servlet-mapping>
<servlet-mapping>
<servlet-name>QueueBrowseServlet</servlet-name>
<url-pattern>/queueBrowse/*</url-pattern>
</servlet-mapping>
<servlet-mapping>
<servlet-name>PortfolioPublishServlet</servlet-name>
<url-pattern>/portfolioPublish</url-pattern>

View File

@ -42,5 +42,16 @@ updates in real time as the market prices change
<a href="amq/FOO/BAR?timeout=10000">Receive a message</a>
</p>
<h2>Queue browser example</h2>
<ul>
<li><a href="queueBrowse/FOO/BAR">Browse a queue</a></li>
<li><a href="queueBrowse/FOO/BAR?view=xml">Browse a queue as XML</a></li>
<li><a href="queueBrowse/FOO/BAR?view=rss&feedTyoe=atom_0.3">Browse a queue as Atom</a></li>
<li><a href="queueBrowse/FOO/BAR?view=rss&feedType=rss_1.0">Browse a queue as RSS 1.0</a></li>
<li><a href="queueBrowse/FOO/BAR?view=rss&feedType=rss_2.0">Browse a queue as RSS 2.0</a></li>
</ul>
</body>
</html>

View File

@ -8,7 +8,7 @@
<body>
<h1>Send a JMS Message</h1>
<form action="amq/FOO/BAR" method="post">
<form action="amq/FOO/BAR?topic=false" method="post">
<p>
<label for="body">Message body: </label>
</p>

View File

@ -17,6 +17,8 @@
package org.apache.activemq.web;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.demo.DefaultQueueSender;
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Handler;
import org.mortbay.jetty.Server;
@ -39,6 +41,18 @@ public class JettyServer {
public static final String WEBAPP_CTX = "/";
public static void main(String[] args) throws Exception {
// lets create a broker
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(true);
broker.addConnector("tcp://localhost:61616");
broker.addConnector("stomp://localhost:61613");
broker.start();
// lets publish some messages so that there is some stuff to browse
DefaultQueueSender.main(new String[] {"FOO.BAR"});
// now lets start the web server
int port = PORT;
if (args.length > 0) {
String text = args[0];