From 755ffd58a8c9bcc6e79f3152f0a43e6f79f41a21 Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Wed, 13 Oct 2010 11:41:16 +0000 Subject: [PATCH] https://issues.apache.org/activemq/browse/AMQ-2948 - ajax support for multiple clients in the same session git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1022071 13f79535-47bb-0310-9956-ffa450edef68 --- activemq-web-demo/src/main/webapp/chat.html | 2 +- activemq-web-demo/src/main/webapp/js/amq.js | 31 +++- .../src/main/webapp/test/amq_test.html | 20 ++ .../org/apache/activemq/web/AjaxListener.java | 75 ++++++++ .../apache/activemq/web/AjaxWebClient.java | 91 +++++++++ .../activemq/web/MessageListenerServlet.java | 173 ++++++++---------- 6 files changed, 292 insertions(+), 100 deletions(-) create mode 100644 activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java create mode 100644 activemq-web/src/main/java/org/apache/activemq/web/AjaxWebClient.java diff --git a/activemq-web-demo/src/main/webapp/chat.html b/activemq-web-demo/src/main/webapp/chat.html index b8a1ebddd2..73592bc132 100755 --- a/activemq-web-demo/src/main/webapp/chat.html +++ b/activemq-web-demo/src/main/webapp/chat.html @@ -55,7 +55,7 @@ // org.activemq.Chat.init(); // } window.onload = function() { - org.activemq.Amq.init({ uri: 'amq', logging: true, timeout: 45 }); + org.activemq.Amq.init({ uri: 'amq', logging: true, timeout: 45, clientId:(new Date()).getTime().toString() }); org.activemq.Chat.init(); }; diff --git a/activemq-web-demo/src/main/webapp/js/amq.js b/activemq-web-demo/src/main/webapp/js/amq.js index 218ebe1a45..877afec57f 100644 --- a/activemq-web-demo/src/main/webapp/js/amq.js +++ b/activemq-web-demo/src/main/webapp/js/amq.js @@ -70,6 +70,11 @@ org.activemq.Amq = function() { // message, messageType }. var messageQueue = []; + // String to distinguish this client from others sharing the same session. + // This can occur when multiple browser windows or tabs using amq.js simultaneously. + // All windows share the same JESSIONID, but need to consume messages independently. + var clientId = null; + /** * Iterate over the returned XML and for each message in the response, * invoke the handler with the matching id. @@ -138,9 +143,8 @@ org.activemq.Amq = function() { var data = 'timeout=' + timeout * 1000 + '&d=' + now.getTime() + '&r=' + Math.random(); - var options = { method: 'get', - data: data, + data: addClientId( data ), success: pollHandler, error: pollErrorHandler}; adapter.ajax(uri, options); @@ -158,7 +162,7 @@ org.activemq.Amq = function() { } else { org.activemq.Amq.startBatch(); adapter.ajax(uri, { method: 'post', - data: buildParams( [message] ), + data: addClientId( buildParams( [message] ) ), error: errorHandler, headers: headers, success: org.activemq.Amq.endBatch}); @@ -181,18 +185,33 @@ org.activemq.Amq = function() { } return s.join(''); } + + // add clientId to data if it exists, before passing data to ajax connection adapter. + var addClientId = function( data ) { + var output = data || ''; + if( clientId ) { + if( output.length > 0 ) { + output += '&'; + } + output += 'clientId='+clientId; + } + return output; + } return { + // optional clientId can be supplied to allow multiple clients (browser windows) within the same session. init : function(options) { connectStatusHandler = options.connectStatusHandler || function(connected){}; uri = options.uri || '/amq'; pollDelay = typeof options.pollDelay == 'number' ? options.pollDelay : 0; timeout = typeof options.timeout == 'number' ? options.timeout : 25; logging = options.logging; + clientId = options.clientId; adapter.init(options); sendPoll(); + }, - + startBatch : function() { batchInProgress = true; }, @@ -205,7 +224,7 @@ org.activemq.Amq = function() { // we need to ensure that messages which set headers are sent by themselves. // if 2 'listen' messages were sent together, and a 'selector' header were added to one of them, - // AMQ would add the selector to both 'listen' commands. + // AMQ would add the selector to both 'listen' commands. for(i=0;itest' ); + org.activemq.Amq.removeListener( 'id', 'topic://test' ); + org.activemq.Amq.endBatch(); + + var requests = org.activemq.AmqAdapter.getRequests(); + var clientNameRegex = /clientId=uniqueClientName/; + + assertEqual( 3, requests.length ); + assertMatch( clientNameRegex, requests[0].options.data ); + assertMatch( clientNameRegex, requests[1].options.data ); + assertMatch( clientNameRegex, requests[2].options.data ); + }} }); diff --git a/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java b/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java new file mode 100644 index 0000000000..b7b0133eff --- /dev/null +++ b/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.web; + +import javax.jms.Message; +import javax.jms.MessageConsumer; + +import org.eclipse.jetty.continuation.Continuation; +import org.eclipse.jetty.continuation.ContinuationSupport; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.activemq.MessageAvailableListener; + +/* + * Listen for available messages and wakeup any continuations. + */ +public class AjaxListener implements MessageAvailableListener { + private static final Log LOG = LogFactory.getLog(AjaxListener.class); + + private long maximumReadTimeout; + private AjaxWebClient client; + private long lastAccess; + private Continuation continuation; + + AjaxListener(AjaxWebClient client, long maximumReadTimeout) { + this.client = client; + this.maximumReadTimeout = maximumReadTimeout; + } + + public void access() { + lastAccess = System.currentTimeMillis(); + } + + public synchronized void setContinuation(Continuation continuation) { + this.continuation = continuation; + } + + public synchronized void onMessageAvailable(MessageConsumer consumer) { + if (LOG.isDebugEnabled()) { + LOG.debug("message for " + consumer + "continuation=" + continuation); + } + if (continuation != null) { + try { + Message message = consumer.receive(10); + continuation.setAttribute("message", message); + continuation.setAttribute("consumer", consumer); + } catch (Exception e) { + LOG.error("Error receiving message " + e, e); + } + continuation.resume(); + } else if (System.currentTimeMillis() - lastAccess > 2 * this.maximumReadTimeout) { + new Thread() { + public void run() { + client.closeConsumers(); + }; + }.start(); + } + } +} diff --git a/activemq-web/src/main/java/org/apache/activemq/web/AjaxWebClient.java b/activemq-web/src/main/java/org/apache/activemq/web/AjaxWebClient.java new file mode 100644 index 0000000000..84bb65e6ad --- /dev/null +++ b/activemq-web/src/main/java/org/apache/activemq/web/AjaxWebClient.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.web; + +import java.util.HashMap; +import java.util.Map; +import java.util.Date; + +import javax.jms.MessageConsumer; +import javax.servlet.http.HttpServletRequest; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.activemq.MessageAvailableConsumer; + +/* + * Collection of all data needed to fulfill requests from a single web client. + */ +public class AjaxWebClient extends WebClient { + private static final Log LOG = LogFactory.getLog(AjaxWebClient.class); + + // an instance which has not been accessed in this many milliseconds can be removed. + final long expireAfter = 60 * 1000; + + Map idMap; + Map destinationNameMap; + AjaxListener listener; + Long lastAccessed; + + public AjaxWebClient( HttpServletRequest request, long maximumReadTimeout ) { + // 'id' meaning the first argument to the JavaScript addListener() function. + // used to indicate which JS callback should handle a given message. + this.idMap = new HashMap(); + + // map consumers to destinations like topic://test, etc. + this.destinationNameMap = new HashMap(); + + this.listener = new AjaxListener( this, maximumReadTimeout ); + + this.lastAccessed = this.getNow(); + } + + public Map getIdMap() { + return this.idMap; + } + + public Map getDestinationNameMap() { + return this.destinationNameMap; + } + + public AjaxListener getListener() { + return this.listener; + } + + public long getMillisSinceLastAccessed() { + return this.getNow() - this.lastAccessed; + } + + public void updateLastAccessed() { + this.lastAccessed = this.getNow(); + } + + public boolean closeIfExpired() { + long now = (new Date()).getTime(); + boolean returnVal = false; + if( this.getMillisSinceLastAccessed() > this.expireAfter ) { + this.close(); + returnVal = true; + } + return returnVal; + } + + protected long getNow() { + return (new Date()).getTime(); + } +} diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java index 7570c88d52..d5fdd93129 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java @@ -23,6 +23,10 @@ import java.io.StringWriter; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Date; +import java.util.Iterator; +import java.util.Timer; +import java.util.TimerTask; import javax.jms.Destination; import javax.jms.JMSException; @@ -66,12 +70,14 @@ import org.eclipse.jetty.continuation.ContinuationSupport; */ public class MessageListenerServlet extends MessageServletSupport { private static final Log LOG = LogFactory.getLog(MessageListenerServlet.class); - + private String readTimeoutParameter = "timeout"; private long defaultReadTimeout = -1; private long maximumReadTimeout = 25000; private int maximumMessages = 100; - + private Timer clientCleanupTimer = new Timer(); + private HashMap ajaxWebClients = new HashMap(); + public void init() throws ServletException { ServletConfig servletConfig = getServletConfig(); String name = servletConfig.getInitParameter("defaultReadTimeout"); @@ -86,8 +92,9 @@ public class MessageListenerServlet extends MessageServletSupport { if (name != null) { maximumMessages = (int)asLong(name); } + clientCleanupTimer.schedule( new ClientCleaner(), 5000, 60000 ); } - + /** * Sends a message to a destination or manage subscriptions. If the the * content type of the POST is @@ -110,14 +117,13 @@ public class MessageListenerServlet extends MessageServletSupport { protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { // lets turn the HTTP post into a JMS Message - - WebClient client = WebClient.getWebClient(request); + AjaxWebClient client = getAjaxWebClient( request ); String messageIds = ""; synchronized (client) { if (LOG.isDebugEnabled()) { - LOG.debug("POST client=" + client + " session=" + request.getSession().getId() + " info=" + request.getPathInfo() + " contentType=" + request.getContentType()); + LOG.debug("POST client=" + client + " session=" + request.getSession().getId() + " clientId="+ request.getParameter("clientId") + " info=" + request.getPathInfo() + " contentType=" + request.getContentType()); // dump(request.getParameterMap()); } @@ -151,27 +157,27 @@ public class MessageListenerServlet extends MessageServletSupport { messages++; if ("listen".equals(type)) { - Listener listener = getListener(request); - Map consumerIdMap = getConsumerIdMap(request); - Map consumerDestinationMap = getConsumerDestinationNameMap(request); + AjaxListener listener = client.getListener(); + Map consumerIdMap = client.getIdMap(); + Map consumerDestinationNameMap = client.getDestinationNameMap(); client.closeConsumer(destination); // drop any existing // consumer. MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName)); consumer.setAvailableListener(listener); consumerIdMap.put(consumer, message); - consumerDestinationMap.put(consumer, destinationName); + consumerDestinationNameMap.put(consumer, destinationName); if (LOG.isDebugEnabled()) { LOG.debug("Subscribed: " + consumer + " to " + destination + " id=" + message); } } else if ("unlisten".equals(type)) { - Map consumerIdMap = getConsumerIdMap(request); - Map consumerDestinationMap = getConsumerDestinationNameMap(request); + Map consumerIdMap = client.getIdMap(); + Map consumerDestinationNameMap = client.getDestinationNameMap(); MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName)); consumer.setAvailableListener(null); consumerIdMap.remove(consumer); - consumerDestinationMap.remove(consumer); + consumerDestinationNameMap.remove(consumer); client.closeConsumer(destination); if (LOG.isDebugEnabled()) { LOG.debug("Unsubscribed: " + consumer); @@ -233,9 +239,9 @@ public class MessageListenerServlet extends MessageServletSupport { */ protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { try { - WebClient client = WebClient.getWebClient(request); + AjaxWebClient client = getAjaxWebClient(request); if (LOG.isDebugEnabled()) { - LOG.debug("GET client=" + client + " session=" + request.getSession().getId() + " uri=" + request.getRequestURI() + " query=" + request.getQueryString()); + LOG.debug("GET client=" + client + " session=" + request.getSession().getId() + " clientId="+ request.getParameter("clientId") + " uri=" + request.getRequestURI() + " query=" + request.getQueryString()); } doMessages(client, request, response); @@ -253,7 +259,7 @@ public class MessageListenerServlet extends MessageServletSupport { * @throws ServletException * @throws IOException */ - protected void doMessages(WebClient client, HttpServletRequest request, HttpServletResponse response) throws JMSException, IOException { + protected void doMessages(AjaxWebClient client, HttpServletRequest request, HttpServletResponse response) throws JMSException, IOException { int messages = 0; // This is a poll for any messages @@ -286,7 +292,11 @@ public class MessageListenerServlet extends MessageServletSupport { } } } - + + // prepare the response + response.setContentType("text/xml"); + response.setHeader("Cache-Control", "no-cache"); + if (message == null) { Continuation continuation = ContinuationSupport.getContinuation(request); @@ -308,23 +318,19 @@ public class MessageListenerServlet extends MessageServletSupport { continuation.suspend(); // Fetch the listeners - Listener listener = getListener(request); + AjaxListener listener = client.getListener(); // register this continuation with our listener. listener.setContinuation(continuation); return; } - - // prepare the responds - response.setContentType("text/xml"); - response.setHeader("Cache-Control", "no-cache"); StringWriter swriter = new StringWriter(); PrintWriter writer = new PrintWriter(swriter); - - Map consumerIdMap = getConsumerIdMap(request); - Map consumerDestinationNameMap = getConsumerDestinationNameMap(request); + + Map consumerIdMap = client.getIdMap(); + Map consumerDestinationNameMap = client.getDestinationNameMap(); response.setStatus(HttpServletResponse.SC_OK); writer.println(""); @@ -388,40 +394,35 @@ public class MessageListenerServlet extends MessageServletSupport { } writer.println(""); } - - protected Listener getListener(HttpServletRequest request) { - HttpSession session = request.getSession(); - Listener listener = (Listener)session.getAttribute("mls.listener"); - if (listener == null) { - listener = new Listener(WebClient.getWebClient(request)); - session.setAttribute("mls.listener", listener); - } - return listener; - } - - protected Map getConsumerIdMap(HttpServletRequest request) { + + /* + * Return the AjaxWebClient for this session+clientId. + * Create one if it does not already exist. + */ + protected AjaxWebClient getAjaxWebClient( HttpServletRequest request ) { + long now = (new Date()).getTime(); HttpSession session = request.getSession(true); - Map map = (Map)session.getAttribute("mls.consumerIdMap"); - if (map == null) { - map = new HashMap(); - session.setAttribute("mls.consumerIdMap", map); + + String clientId = request.getParameter( "clientId" ); + // if user doesn't supply a 'clientId', we'll just use a default. + if( clientId == null ) { + clientId = "defaultAjaxWebClient"; } - return map; - } - - protected Map getConsumerDestinationNameMap(HttpServletRequest request) { - HttpSession session = request.getSession(true); - Map map = (Map)session.getAttribute("mls.consumerDestinationNameMap"); - if (map == null) { - map = new HashMap(); - session.setAttribute("mls.consumerDestinationNameMap", map); + String sessionKey = session.getId() + '-' + clientId; + + AjaxWebClient client = ajaxWebClients.get( sessionKey ); + synchronized (ajaxWebClients) { + // create a new AjaxWebClient if one does not already exist for this sessionKey. + if( client == null ) { + if (LOG.isDebugEnabled()) { + LOG.debug( "creating new AjaxWebClient in "+sessionKey ); + } + client = new AjaxWebClient( request, maximumReadTimeout ); + ajaxWebClients.put( sessionKey, client ); + } + client.updateLastAccessed(); } - return map; - } - - protected boolean isRicoAjax(HttpServletRequest request) { - String rico = request.getParameter("rico"); - return rico != null && rico.equals("true"); + return client; } /** @@ -440,48 +441,34 @@ public class MessageListenerServlet extends MessageServletSupport { } return answer; } - + /* - * Listen for available messages and wakeup any continuations. + * an instance of this class runs every minute (started in init), to clean up old web clients & free resources. */ - private class Listener implements MessageAvailableListener { - WebClient client; - long lastAccess; - Continuation continuation; - - Listener(WebClient client) { - this.client = client; - } - - public void access() { - lastAccess = System.currentTimeMillis(); - } - - public synchronized void setContinuation(Continuation continuation) { - this.continuation = continuation; - } - - public synchronized void onMessageAvailable(MessageConsumer consumer) { - if (LOG.isDebugEnabled()) { - LOG.debug("message for " + consumer + "continuation=" + continuation); + private class ClientCleaner extends TimerTask { + public void run() { + if( LOG.isDebugEnabled() ) { + LOG.debug( "Cleaning up expired web clients." ); } - if (continuation != null) { - try { - Message message = consumer.receive(10); - continuation.setAttribute("message", message); - continuation.setAttribute("consumer", consumer); - } catch (Exception e) { - LOG.error("Error receiving message " + e, e); + + synchronized( ajaxWebClients ) { + Iterator it = ajaxWebClients.entrySet().iterator(); + while ( it.hasNext() ) { + Map.Entry e = (Map.Entry)it.next(); + String key = e.getKey(); + AjaxWebClient val = e.getValue(); + if ( LOG.isDebugEnabled() ) { + LOG.debug( "AjaxWebClient " + key + " last accessed " + val.getMillisSinceLastAccessed()/1000 + " seconds ago." ); } - continuation.resume(); - } else if (System.currentTimeMillis() - lastAccess > 2 * maximumReadTimeout) { - new Thread() { - public void run() { - client.closeConsumers(); - }; - }.start(); + // close an expired client and remove it from the ajaxWebClients hash. + if( val.closeIfExpired() ) { + if ( LOG.isDebugEnabled() ) { + LOG.debug( "Removing expired AjaxWebClient " + key ); + } + it.remove(); + } + } } } - } }