New amq js component to support listen style interaction

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@372855 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gregory John Wilkins 2006-01-27 14:05:02 +00:00
parent 6a70920b20
commit 4e5012ca5d
12 changed files with 832 additions and 108 deletions

View File

@ -0,0 +1,398 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import org.apache.activemq.MessageAvailableConsumer;
import org.apache.activemq.MessageAvailableListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mortbay.util.ajax.Continuation;
import org.mortbay.util.ajax.ContinuationSupport;
/**
* A servlet for sending and receiving messages to/from JMS destinations using
* HTTP POST for sending and HTTP GET for receiving. <p/> You can specify the
* destination and whether it is a topic or queue via configuration details on
* the servlet or as request parameters. <p/> For reading messages you can
* specify a readTimeout parameter to determine how long the servlet should
* block for.
*
* @version $Revision: 1.1.1.1 $
*/
public class MessageListenerServlet extends MessageServletSupport {
private static final Log log = LogFactory.getLog(MessageListenerServlet.class);
private String readTimeoutParameter = "timeout";
private long defaultReadTimeout = -1;
private long maximumReadTimeout = 20000;
private int maximumMessages = 100;
public void init() throws ServletException {
ServletConfig servletConfig = getServletConfig();
String name = servletConfig.getInitParameter("defaultReadTimeout");
if (name != null) {
defaultReadTimeout = asLong(name);
}
name = servletConfig.getInitParameter("maximumReadTimeout");
if (name != null) {
maximumReadTimeout = asLong(name);
}
name = servletConfig.getInitParameter("maximumMessages");
if (name != null) {
maximumMessages = (int) asLong(name);
}
}
/**
* Sends a message to a destination
*
* @param request
* @param response
* @throws ServletException
* @throws IOException
*/
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
// lets turn the HTTP post into a JMS Message
try {
WebClient client = getWebClient(request);
// System.err.println("POST client="+client+"
// session="+request.getSession().getId()+"
// info="+request.getPathInfo()+" query="+request.getQueryString());
String text = getPostedMessageBody(request);
// lets create the destination from the URI?
Destination destination = getDestination(client, request);
if (destination == null)
throw new NoDestinationSuppliedException();
if (log.isDebugEnabled()) {
log.debug("Sending message to: " + destination + " with text: " + text);
}
TextMessage message = client.getSession().createTextMessage(text);
appendParametersToMessage(request, message);
client.send(destination, message);
// lets return a unique URI for reliable messaging
response.setStatus(HttpServletResponse.SC_NO_CONTENT);
// System.err.println("Sent "+message+" to "+destination);
/*
* StringWriter swriter = new StringWriter(); PrintWriter writer =
* new PrintWriter(swriter);
*
* writer.println("<ajax-response><response type='object'
* id='amqSend'><amq jmsid='"+message.getJMSMessageID()+"'/></response></ajax-response>");
* writer.flush(); String m=swriter.toString();
* System.err.println(m); response.getWriter().write(m);
* response.getWriter().flush();
*/
} catch (JMSException e) {
throw new ServletException("Could not post JMS message: " + e, e);
}
}
/**
* Supports a HTTP DELETE to be equivlanent of consuming a singe message
* from a queue
*/
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
try {
WebClient client = getWebClient(request);
// System.err.println("GET client="+client+"
// session="+request.getSession().getId()+"
// info="+request.getPathInfo()+" query="+request.getQueryString());
Destination destination = getDestination(client, request);
if (destination != null)
doSubscription(client, destination, request, response);
else
doMessages(client, request, response);
}
catch (JMSException e) {
throw new ServletException("JMS problem: " + e, e);
}
}
/**
* Reads a message from a destination up to some specific timeout period
*
* @param client
* The webclient
* @param request
* @param response
* @throws ServletException
* @throws IOException
*/
protected void doMessages(WebClient client, HttpServletRequest request, HttpServletResponse response) throws JMSException, IOException {
int messages = 0;
// This is a poll for any messages
long timeout = getReadTimeout(request);
Continuation continuation = null;
Message message = null;
Listener listener = getListener(request);
synchronized (client) {
List consumers = client.getConsumers();
MessageAvailableConsumer consumer = null;
// Look for a message that is ready to go
for (int i = 0; message == null && i < consumers.size(); i++) {
consumer = (MessageAvailableConsumer) consumers.get(i);
if (consumer.getAvailableListener() == null)
continue;
// Look for any available messages
message = consumer.receiveNoWait();
// System.err.println("received "+message+" from "+consumer);
}
// Get an existing Continuation or create a new one if there are no
// messages
if (message == null) {
continuation = ContinuationSupport.getContinuation(request, client);
// register this continuation with our listener.
listener.setContinuation(continuation);
// Get the continuation object (may wait and/or retry
// request here).
continuation.suspend(timeout);
}
// prepare the responds
response.setContentType("text/xml");
StringWriter swriter = new StringWriter();
PrintWriter writer = new PrintWriter(swriter);
Map consumerIdMap = getConsumerIdMap(request);
response.setStatus(HttpServletResponse.SC_OK);
writer.println("<ajax-response>");
// Send any message we already have
if (message != null) {
String id = (String) consumerIdMap.get(consumer);
writer.print("<response type='object' id='");
writer.print(id);
writer.print("'>\n");
writeMessageResponse(writer, message);
writer.println("</response>");
messages++;
}
// Send the rest of the messages
for (int i = 0; i < consumers.size() && messages < maximumMessages; i++) {
consumer = (MessageAvailableConsumer) consumers.get(i);
if (consumer.getAvailableListener() == null)
continue;
// Look for any available messages
message = consumer.receiveNoWait();
// System.err.println("received "+message+" from "+consumer);
while (message != null && messages < maximumMessages) {
Destination destination = message.getJMSDestination();
String id = (String) consumerIdMap.get(consumer);
writer.print("<response type='object' id='");
writer.print(id);
writer.print("'>");
writeMessageResponse(writer, message);
writer.println("</response>");
messages++;
message = consumer.receiveNoWait();
}
}
// Add poll message
writer.println("<response type='object' id='amqPoll'><ok/></response>");
writer.println("</ajax-response>");
writer.flush();
String m = swriter.toString();
// System.err.println(m);
response.getWriter().write(m);
}
}
/**
* Subscribe or unsubscribe to a destination. The listen request parameter
* is used to indicate subscribe (tree) or unsubscribe (false).
*
* @param request
* @param response
* @throws ServletException
* @throws IOException
*/
protected void doSubscription(WebClient client, Destination destination, HttpServletRequest request, HttpServletResponse response) throws JMSException, ServletException, IOException {
String s = request.getParameter("listen");
if (s == null || s.length() == 0) {
log.warn("No listen paramenter for subscribe");
response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No listen parameter");
return;
}
boolean listen = Boolean.valueOf(s).booleanValue();
String id = request.getParameter("id");
if (listen && (id == null || id.length() == 0)) {
log.warn("No id paramenter for subscribe");
response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No id parameter");
return;
}
Listener listener = getListener(request);
Map consumerIdMap = getConsumerIdMap(request);
synchronized (client) {
MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination);
if (listen) {
consumer.setAvailableListener(listener);
consumerIdMap.put(consumer, id);
// System.err.println("Subscribed: "+consumer+" to
// "+destination);
} else {
// TODO should we destroy consumer on unsubscribe?
consumer.setAvailableListener(null);
consumerIdMap.remove(consumer);
// System.err.println("Unsubscribed: "+consumer);
}
}
response.setStatus(HttpServletResponse.SC_NO_CONTENT);
}
protected void writeMessageResponse(PrintWriter writer, Message message) throws JMSException, IOException {
if (message instanceof TextMessage) {
TextMessage textMsg = (TextMessage) message;
writer.print(textMsg.getText());
} else if (message instanceof ObjectMessage) {
ObjectMessage objectMsg = (ObjectMessage) message;
Object object = objectMsg.getObject();
writer.print(object.toString());
}
}
protected Listener getListener(HttpServletRequest request) {
HttpSession session = request.getSession();
Listener listener = (Listener) session.getAttribute("mls.listener");
if (listener == null) {
listener = new Listener(getWebClient(request));
session.setAttribute("mls.listener", listener);
}
return listener;
}
protected Map getConsumerIdMap(HttpServletRequest request) {
HttpSession session = request.getSession();
Map map = (Map) session.getAttribute("mls.consumerIdMap");
if (map == null) {
map = new HashMap();
session.setAttribute("mls.consumerIdMap", map);
}
return map;
}
protected boolean isRicoAjax(HttpServletRequest request) {
String rico = request.getParameter("rico");
return rico != null && rico.equals("true");
}
/**
* @return the timeout value for read requests which is always >= 0 and <=
* maximumReadTimeout to avoid DoS attacks
*/
protected long getReadTimeout(HttpServletRequest request) {
long answer = defaultReadTimeout;
String name = request.getParameter(readTimeoutParameter);
if (name != null) {
answer = asLong(name);
}
if (answer < 0 || answer > maximumReadTimeout) {
answer = maximumReadTimeout;
}
return answer;
}
/*
* Listen for available messages and wakeup any continuations.
*/
private class Listener implements MessageAvailableListener {
WebClient client;
Continuation continuation;
List queue = new LinkedList();
Listener(WebClient client) {
this.client = client;
}
public void setContinuation(Continuation continuation) {
synchronized (client) {
this.continuation = continuation;
}
}
public void onMessageAvailable(MessageConsumer consumer) {
synchronized (client) {
// System.err.println("message for "+consumer+"
// continuation="+continuation);
if (continuation != null)
continuation.resume();
continuation = null;
}
}
}
}

View File

@ -86,6 +86,8 @@ public class MessageServlet extends MessageServletSupport {
// lets create the destination from the URI?
Destination destination = getDestination(client, request);
if (destination==null)
throw new NoDestinationSuppliedException();
if (log.isDebugEnabled()) {
log.debug("Sending message to: " + destination + " with text: " + text);
@ -134,6 +136,8 @@ public class MessageServlet extends MessageServletSupport {
try {
WebClient client = getWebClient(request);
Destination destination = getDestination(client, request);
if (destination==null)
throw new NoDestinationSuppliedException();
long timeout = getReadTimeout(request);
boolean ajax = isRicoAjax(request);
if (!ajax)

View File

@ -136,7 +136,7 @@ public abstract class MessageServletSupport extends HttpServlet {
/**
* @return the destination to use for the current request
*/
protected Destination getDestination(WebClient client, HttpServletRequest request) throws JMSException, NoDestinationSuppliedException {
protected Destination getDestination(WebClient client, HttpServletRequest request) throws JMSException {
String destinationName = request.getParameter(destinationParameter);
if (destinationName == null) {
if (defaultDestination == null) {
@ -154,16 +154,17 @@ public abstract class MessageServletSupport extends HttpServlet {
* @return the destination to use for the current request using the relative URI from
* where this servlet was invoked as the destination name
*/
protected Destination getDestinationFromURI(WebClient client, HttpServletRequest request) throws NoDestinationSuppliedException, JMSException {
protected Destination getDestinationFromURI(WebClient client, HttpServletRequest request) throws JMSException {
String uri = request.getPathInfo();
if (uri == null) {
throw new NoDestinationSuppliedException();
return null;
}
// replace URI separator with JMS destination separator
if (uri.startsWith("/")) {
uri = uri.substring(1);
}
uri = uri.replace('/', '.');
System.err.println("destination uri="+uri);
return getDestination(client, request, uri);
}
@ -182,9 +183,7 @@ public abstract class MessageServletSupport extends HttpServlet {
/**
* @return true if the current request is for a topic destination, else false if its for a queue
*/
protected boolean isTopic
(HttpServletRequest
request) {
protected boolean isTopic(HttpServletRequest request) {
boolean aTopic = defaultTopicFlag;
String aTopicText = request.getParameter(topicParameter);
if (aTopicText != null) {

View File

@ -21,7 +21,10 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.jms.ConnectionFactory;
@ -228,6 +231,18 @@ public class WebClient implements HttpSessionActivationListener, Externalizable
}
}
}
public synchronized List getConsumers()
{
ArrayList list = new ArrayList(topicConsumers.size()+queueConsumers.size());
// TODO check this double synchronization on queue but not on topics
synchronized (queueConsumers) {
list.addAll(queueConsumers.values());
}
list.addAll(topicConsumers.values());
return list;
}
protected ActiveMQSession createSession() throws JMSException {
return (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);

View File

@ -45,11 +45,11 @@
<!-- servlet mappings -->
<!-- the main REST servlet -->
<!-- the subscription REST servlet -->
<servlet>
<servlet-name>MessageServlet</servlet-name>
<servlet-class>org.apache.activemq.web.MessageServlet</servlet-class>
<servlet-name>MessageListenerServlet</servlet-name>
<servlet-class>org.apache.activemq.web.MessageListenerServlet</servlet-class>
<load-on-startup>1</load-on-startup>
</servlet>
@ -59,11 +59,12 @@
<servlet-class>org.apache.activemq.web.PortfolioPublishServlet</servlet-class>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>MessageServlet</servlet-name>
<url-pattern>/jms/*</url-pattern>
<servlet-name>MessageListenerServlet</servlet-name>
<url-pattern>/amq/*</url-pattern>
</servlet-mapping>
<servlet-mapping>
<servlet-name>PortfolioPublishServlet</servlet-name>
<url-pattern>/portfolioPublish</url-pattern>

View File

@ -0,0 +1,62 @@
div#chatroom
{
width: 41em;
background-color: #e0e0e0;
border: 1px solid black;
}
div#chat
{
float: left;
width: 30em;
height: 20ex;
overflow: auto;
background-color: #f0f0f0;
padding: 4px;
border-right: 1px solid black;
}
div#members
{
float: left;
clear: right;
width: 10em;
border: 0px solid black;
}
div#input
{
clear: both;
padding: 4px;
border-top: 1px solid black;
}
input#phrase
{
width:28em;
background-color: #e0f0f0;
}
input#username
{
width:14em;
background-color: #e0f0f0;
}
div.hidden
{
display: none;
}
span.from
{
font-weight: bold;
}
span.alert
{
font-style: italic;
}

View File

@ -1,20 +1,34 @@
<html>
<head>
<title>Chat</title>
<link rel="stylesheet" href="style.css" type="text/css">
</head>
<body onload="chooseNickName()">
<head>
<title>Chat</title>
<script type="text/javascript" src="../js/default.js"></script>
<script type="text/javascript" src="chat.js"></script>
<link rel="stylesheet" href="chat.css" type="text/css">
<link rel="stylesheet" href="style.css" type="text/css">
</head>
<body>
<h1>Chat Example</h1>
<h1>Chat Example</h1>
Welcome to this little chat example<br><br>
Welcome to this little chat example
<br>
<br>
<textarea id="chatLog" rows="20" cols="100"></textarea><br>
Type here: <input id="userInput" type="text" size="70">
<button onclick="saySomething()" default="true">Send</button>
<button onclick="chooseNickName()">Choose NickName</button>
<div id="chatroom">
<div id="chat"></div>
<script src="webmq.js" lanaguage="JavaScript1.2"></script>
<script src="chat.js" language="JavaScript1.2"></script>
</body>
</html>
<div id="members"></div>
<div id="input">
<div id="join" class="hidden">Username:&nbsp;<input id="username" type="text" />
<input id="joinB" class="button" type="submit" name="join" value="Join" />
</div>
<div id="joined" class="hidden">Chat:&nbsp;<input id="phrase" type="text"></input>
<input id="sendB" class="button" type="submit" name="join" value="Send" />
<input id="leaveB" class="button" type="submit" name="join" value="Leave" />
</div>
</div>
</div>
</body>
</html>

View File

@ -2,37 +2,12 @@
// Original code by Joe Walnes
// -----------------
function chooseNickName() {
var newNickName = prompt("Please choose a nick name", nickName)
if (newNickName) {
connection.sendMessage(chatTopic, "<message type='status'>" + nickName + " is now known as " + newNickName + "</message>")
nickName = newNickName
}
}
// when user clicks 'send', broadcast a message
function saySomething() {
var text = document.getElementById("userInput").value
connection.sendMessage(chatTopic, "<message type='chat' from='" + nickName + "'>" + text + "</message>")
document.getElementById("userInput").value = ""
}
var connection = null;
var chatTopic = "CHAT.DEMO";
var chatMembership = "CHAT.DEMO";
// when message is received from topic, display it in chat log
function receiveMessage(message) {
var root = message.documentElement
var chatLog = document.getElementById("chatLog")
var type = root.getAttribute('type')
if (type == 'status') {
chatLog.value += "*** " + elementText(root) + "\n"
}
else if (type == 'chat') {
chatLog.value += "<" + root.getAttribute('from') + "> " + elementText(root) + "\n"
}
else {
chatLog.value += "*** Unknown type: " + type + " for: " + root + "\n"
}
}
// returns the text of an XML element
function elementText(element) {
@ -48,8 +23,201 @@ function elementText(element) {
return answer
}
var connection = new Connection("jms/FOO/BAR")
var chatTopic = "FOO.BAR"
connection.addMessageListener(chatTopic, receiveMessage)
var nickName = "unknown"
document.getElementById("chatLog").value = ''
var room =
{
last: "",
username: "unknown",
join: function()
{
var name = $('username').value;
if (name == null || name.length==0 )
{
alert("Please enter a username!");
}
else
{
username=name;
amq.addTopicListener('chat',chatTopic,room);
// amq.sendMessage(chatSubscription, "<subscribe>" + username + "</subscribe>");
// switch the input form
$('join').className='hidden';
$('joined').className='';
$('phrase').focus();
Behaviour.apply();
amq.sendMessage(chatMembership, "<message type='join' from='" + username + "'/>");
}
},
leave: function()
{
amq.sendMessage(chatMembership, "<message type='leave' from='" + username + "'/>");
// switch the input form
$('join').className='';
$('joined').className='hidden';
$('username').focus();
username=null;
Behaviour.apply();
},
chat: function()
{
var text = $('phrase').value;
if (text != null && text.length>0 )
{
// TODO more encoding?
text=text.replace('<','&lt;');
text=text.replace('>','&gt;');
amq.sendMessage(chatTopic, "<message type='chat' from='" + username + "'>" + text + "</message>");
$('phrase').value="";
}
},
amqMessage: function(message)
{
var chat=$('chat');
var type=message.attributes['type'].value;
var from=message.attributes['from'].value;
switch(type)
{
case 'chat' :
{
var text=message.childNodes[0].data;
var alert='false';
if ( from == this.last )
from="...";
else
{
this.last=from;
from+=":";
}
chat.innerHTML += "<span class=\"from\">"+from+"&nbsp;</span><span class=\"text\">"+text+"</span><br/>";
break;
}
case 'ping' :
{
var li = document.createElement('li');
li.innerHtml=from;
$('members').innerHTML+="<span class=\"member\">"+from+"</span><br/>";
break;
}
case 'join' :
{
$('members').innerHTML="";
if (username!=null)
amq.sendMessage(chatMembership, "<message type='ping' from='" + username + "'/>");
chat.innerHTML += "<span class=\"alert\"><span class=\"from\">"+from+"&nbsp;</span><span class=\"text\">has joined the room!</span></span><br/>";
break;
}
case 'leave':
{
$('members').innerHTML="";
if (username!=null)
amq.sendMessage(chatMembership, "<message type='ping' from='" + username + "'/>");
chat.innerHTML += "<span class=\"alert\"><span class=\"from\">"+from+"&nbsp;</span><span class=\"text\">has left the room!</span></span><br/>";
break;
}
}
chat.scrollTop = chat.scrollHeight - chat.clientHeight;
}
};
function chatPoll(first)
{
if (first || $('join').className=='hidden' && $('joined').className=='hidden')
{
$('join').className='';
$('joined').className='hidden';
$('username').focus();
Behaviour.apply();
}
}
function chatInit()
{
amq.addPollHandler(chatPoll);
}
var behaviours =
{
'#username' : function(element)
{
element.setAttribute("autocomplete","OFF");
element.onkeypress = function(event)
{
if (event && (event.keyCode==13 || event.keyCode==10))
{
room.join();
}
}
},
'#joinB' : function(element)
{
element.onclick = function()
{
room.join();
}
},
'#phrase' : function(element)
{
element.setAttribute("autocomplete","OFF");
element.onkeypress = function(event)
{
if (event && (event.keyCode==13 || event.keyCode==10))
{
room.chat();
return false;
}
return true;
}
},
'#sendB' : function(element)
{
element.onclick = function()
{
room.chat();
}
},
'#leaveB' : function(element)
{
element.onclick = function()
{
room.leave();
}
}
};
Behaviour.register(behaviours);
Behaviour.addLoadEvent(chatInit);

View File

@ -0,0 +1,80 @@
// AMQ handler
var amq =
{
first: true,
pollEvent: function(first) {},
addPollHandler : function(func)
{
var old = this.pollEvent;
this.pollEvent = function(first)
{
old(first);
func(first);
}
},
sendMessage : function(destination,message)
{
ajaxEngine.sendRequestWithData('amqSend', message, 'destination='+destination);
},
// Listen on a topic. handler must have a amqMessage method taking a message arguement
addTopicListener : function(id,topic,handler)
{
var ajax2amq = { ajaxUpdate: function(msg) { handler.amqMessage(amqFirstElement(msg)) } };
ajaxEngine.registerAjaxObject(id, ajax2amq);
ajaxEngine.sendRequest('amqListen', 'destination='+topic+'&topic=true&id='+id+'&listen=true');
},
// Listen on a channel. handler must have a amqMessage method taking a message arguement
addChannelListener: function(id,channel,handler)
{
var ajax2amq = { ajaxUpdate: function(msg) { handler.amqMessage(amqFirstElement(msg)) } };
ajaxEngine.registerAjaxObject(id, ajax2amq);
ajaxEngine.sendRequest('amqListen', 'destination='+channel+'&topic=false&id='+id+'&listen=true');
}
};
function initAMQ()
{
// Register URLs. All the same at the moment and params are used to distinguish types
ajaxEngine.registerRequest('amqListen','/amq');
ajaxEngine.registerRequest('amqPoll', '/amq');
ajaxEngine.registerRequest('amqSend', '/amq');
var pollHandler = {
ajaxUpdate: function(ajaxResponse)
{
// Poll again for events
amq.pollEvent(amq.first);
amq.first=false;
ajaxEngine.sendRequest('amqPoll');
}
};
ajaxEngine.registerAjaxObject('amqPoll', pollHandler);
var sendHandler = {
ajaxUpdate: function(ajaxResponse)
{
}
};
ajaxEngine.registerAjaxObject('amqSend', sendHandler);
ajaxEngine.sendRequest('amqPoll',"timeout=0"); // first poll to establish polling session ID
}
Behaviour.addLoadEvent(initAMQ);
function amqFirstElement(t) {
for (i = 0; i < t.childNodes.length; i++) {
var child = t.childNodes[i]
if (child.nodeType == 1) {
return child
}
}
return null
}

View File

@ -1,7 +1,7 @@
// Technique borrowed from scriptaculous to do includes.
var DefaultJS = {
Version: 'Jetty Test',
Version: 'AMQ default JS',
script: function(libraryName) {
document.write('<script type="text/javascript" src="'+libraryName+'"></script>');
},
@ -19,10 +19,12 @@ var DefaultJS = {
this.script(path + 'ricoAjax.js');
this.script(path + 'scriptaculous.js');
this.script(path + 'amq.js');
break;
}
}
}
}
DefaultJS.load();
DefaultJS.load();

View File

@ -1,32 +1,21 @@
var PollHandler =
{
ajaxUpdate: function(ajaxResponse)
{
// Poll again for events
ajaxEngine.sendRequest('getEvents');
}
};
var PriceHandler =
var priceHandler =
{
ajaxUpdate: function(ajaxResponse)
amqMessage: function(message)
{
var priceMessage = firstElement(ajaxResponse)
if (priceMessage != null) {
if (message != null) {
var price = parseFloat(priceMessage.getAttribute('bid'))
var symbol = priceMessage.getAttribute('stock')
var movement = priceMessage.getAttribute('movement')
if (movement == null) {
var price = parseFloat(message.getAttribute('bid'))
var symbol = message.getAttribute('stock')
var movement = message.getAttribute('movement')
if (movement == null) {
movement = 'up'
}
}
//alert('Received price ' + priceMessage + ' for price: ' + price + ' symbol: ' + symbol + ' movement: ' + movement)
var row = document.getElementById(symbol)
if (row) {
var row = document.getElementById(symbol)
if (row) {
// perform portfolio calculations
var value = asFloat(find(row, 'amount')) * price
var pl = value - asFloat(find(row, 'cost'))
@ -37,24 +26,26 @@ var PriceHandler =
find(row, 'pl').innerHTML = fixedDigits(pl, 2)
find(row, 'price').className = movement
find(row, 'pl').className = pl >= 0 ? 'up' : 'down'
}
}
}
}
};
function initPage()
{
ajaxEngine.registerRequest('getEvents', '/jms/STOCKS/*?rico=true&id=priceChange');
ajaxEngine.registerAjaxObject('poll', PollHandler);
ajaxEngine.registerAjaxObject('priceChange', PriceHandler);
ajaxEngine.sendRequest('getEvents');
function portfolioPoll(first)
{
if (first)
{
amq.addTopicListener('stocks','STOCKS.*',priceHandler);
}
}
Behaviour.addLoadEvent(initPage);
function portfolioInit()
{
amq.addPollHandler(portfolioPoll);
}
Behaviour.addLoadEvent(portfolioInit);
/// -----------------
// Original code by Joe Walnes
@ -82,16 +73,6 @@ function find(t, id) {
return null
}
function firstElement(t) {
for (i = 0; i < t.childNodes.length; i++) {
var child = t.childNodes[i]
if (child.nodeType == 1) {
return child
}
}
return null
}
/**
* Return the text contents of an element as a floating point number.
*/

View File

@ -134,8 +134,8 @@ geronimo_core_version=1.0-SNAPSHOT
<!-- used by transport-http module -->
commons_httpclient_version=2.0.1
servlet_api_version=2.5-6.0.0beta6
jetty_version=6.0.0beta6
servlet_api_version=2.5-6.0-SNAPSHOT
jetty_version=6.0-SNAPSHOT
tomcat_version=5.0.28
xercesImpl_version=2.6.2