https://issues.apache.org/activemq/browse/AMQ-2948 - ajax support for multiple clients in the same session

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1022071 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-10-13 11:41:16 +00:00
parent 80528f64a8
commit 755ffd58a8
6 changed files with 292 additions and 100 deletions

View File

@ -55,7 +55,7 @@
// org.activemq.Chat.init();
// }
window.onload = function() {
org.activemq.Amq.init({ uri: 'amq', logging: true, timeout: 45 });
org.activemq.Amq.init({ uri: 'amq', logging: true, timeout: 45, clientId:(new Date()).getTime().toString() });
org.activemq.Chat.init();
};
</script>

View File

@ -70,6 +70,11 @@ org.activemq.Amq = function() {
// message, messageType }.
var messageQueue = [];
// String to distinguish this client from others sharing the same session.
// This can occur when multiple browser windows or tabs using amq.js simultaneously.
// All windows share the same JESSIONID, but need to consume messages independently.
var clientId = null;
/**
* Iterate over the returned XML and for each message in the response,
* invoke the handler with the matching id.
@ -138,9 +143,8 @@ org.activemq.Amq = function() {
var data = 'timeout=' + timeout * 1000
+ '&d=' + now.getTime()
+ '&r=' + Math.random();
var options = { method: 'get',
data: data,
data: addClientId( data ),
success: pollHandler,
error: pollErrorHandler};
adapter.ajax(uri, options);
@ -158,7 +162,7 @@ org.activemq.Amq = function() {
} else {
org.activemq.Amq.startBatch();
adapter.ajax(uri, { method: 'post',
data: buildParams( [message] ),
data: addClientId( buildParams( [message] ) ),
error: errorHandler,
headers: headers,
success: org.activemq.Amq.endBatch});
@ -181,18 +185,33 @@ org.activemq.Amq = function() {
}
return s.join('');
}
// add clientId to data if it exists, before passing data to ajax connection adapter.
var addClientId = function( data ) {
var output = data || '';
if( clientId ) {
if( output.length > 0 ) {
output += '&';
}
output += 'clientId='+clientId;
}
return output;
}
return {
// optional clientId can be supplied to allow multiple clients (browser windows) within the same session.
init : function(options) {
connectStatusHandler = options.connectStatusHandler || function(connected){};
uri = options.uri || '/amq';
pollDelay = typeof options.pollDelay == 'number' ? options.pollDelay : 0;
timeout = typeof options.timeout == 'number' ? options.timeout : 25;
logging = options.logging;
clientId = options.clientId;
adapter.init(options);
sendPoll();
},
startBatch : function() {
batchInProgress = true;
},
@ -205,7 +224,7 @@ org.activemq.Amq = function() {
// we need to ensure that messages which set headers are sent by themselves.
// if 2 'listen' messages were sent together, and a 'selector' header were added to one of them,
// AMQ would add the selector to both 'listen' commands.
// AMQ would add the selector to both 'listen' commands.
for(i=0;i<messageQueue.length;i++) {
// a message with headers should always be sent by itself. if other messages have been added, send this one later.
if ( messageQueue[ i ].headers && messagesToSend.length == 0 ) {
@ -223,7 +242,7 @@ org.activemq.Amq = function() {
adapter.ajax(uri, {
method: 'post',
headers: outgoingHeaders,
data: body,
data: addClientId( body ),
success: org.activemq.Amq.endBatch,
error: errorHandler});
} else {

View File

@ -282,6 +282,26 @@
org.activemq.Amq.testPollHandler( response );
assertEqual( 'test message', callbackValue.textContent );
}},
testClientIdSpecifiedInInitIsAddedToAllAjaxRequests: function() { with( this ) {
// need to reset to remove the poll message sent when init() is called in setup().
org.activemq.AmqAdapter.reset();
org.activemq.Amq.init({ uri: '../amq', timeout: 30, clientId:'uniqueClientName' });
org.activemq.Amq.addListener( 'id', 'queue://test', function(){} );
org.activemq.Amq.sendMessage( 'queue://test', '<message>test</message>' );
org.activemq.Amq.removeListener( 'id', 'topic://test' );
org.activemq.Amq.endBatch();
var requests = org.activemq.AmqAdapter.getRequests();
var clientNameRegex = /clientId=uniqueClientName/;
assertEqual( 3, requests.length );
assertMatch( clientNameRegex, requests[0].options.data );
assertMatch( clientNameRegex, requests[1].options.data );
assertMatch( clientNameRegex, requests[2].options.data );
}}
});

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.activemq.MessageAvailableListener;
/*
* Listen for available messages and wakeup any continuations.
*/
public class AjaxListener implements MessageAvailableListener {
private static final Log LOG = LogFactory.getLog(AjaxListener.class);
private long maximumReadTimeout;
private AjaxWebClient client;
private long lastAccess;
private Continuation continuation;
AjaxListener(AjaxWebClient client, long maximumReadTimeout) {
this.client = client;
this.maximumReadTimeout = maximumReadTimeout;
}
public void access() {
lastAccess = System.currentTimeMillis();
}
public synchronized void setContinuation(Continuation continuation) {
this.continuation = continuation;
}
public synchronized void onMessageAvailable(MessageConsumer consumer) {
if (LOG.isDebugEnabled()) {
LOG.debug("message for " + consumer + "continuation=" + continuation);
}
if (continuation != null) {
try {
Message message = consumer.receive(10);
continuation.setAttribute("message", message);
continuation.setAttribute("consumer", consumer);
} catch (Exception e) {
LOG.error("Error receiving message " + e, e);
}
continuation.resume();
} else if (System.currentTimeMillis() - lastAccess > 2 * this.maximumReadTimeout) {
new Thread() {
public void run() {
client.closeConsumers();
};
}.start();
}
}
}

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web;
import java.util.HashMap;
import java.util.Map;
import java.util.Date;
import javax.jms.MessageConsumer;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.activemq.MessageAvailableConsumer;
/*
* Collection of all data needed to fulfill requests from a single web client.
*/
public class AjaxWebClient extends WebClient {
private static final Log LOG = LogFactory.getLog(AjaxWebClient.class);
// an instance which has not been accessed in this many milliseconds can be removed.
final long expireAfter = 60 * 1000;
Map<MessageAvailableConsumer, String> idMap;
Map<MessageAvailableConsumer, String> destinationNameMap;
AjaxListener listener;
Long lastAccessed;
public AjaxWebClient( HttpServletRequest request, long maximumReadTimeout ) {
// 'id' meaning the first argument to the JavaScript addListener() function.
// used to indicate which JS callback should handle a given message.
this.idMap = new HashMap<MessageAvailableConsumer, String>();
// map consumers to destinations like topic://test, etc.
this.destinationNameMap = new HashMap<MessageAvailableConsumer, String>();
this.listener = new AjaxListener( this, maximumReadTimeout );
this.lastAccessed = this.getNow();
}
public Map<MessageAvailableConsumer, String> getIdMap() {
return this.idMap;
}
public Map<MessageAvailableConsumer, String> getDestinationNameMap() {
return this.destinationNameMap;
}
public AjaxListener getListener() {
return this.listener;
}
public long getMillisSinceLastAccessed() {
return this.getNow() - this.lastAccessed;
}
public void updateLastAccessed() {
this.lastAccessed = this.getNow();
}
public boolean closeIfExpired() {
long now = (new Date()).getTime();
boolean returnVal = false;
if( this.getMillisSinceLastAccessed() > this.expireAfter ) {
this.close();
returnVal = true;
}
return returnVal;
}
protected long getNow() {
return (new Date()).getTime();
}
}

View File

@ -23,6 +23,10 @@ import java.io.StringWriter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Date;
import java.util.Iterator;
import java.util.Timer;
import java.util.TimerTask;
import javax.jms.Destination;
import javax.jms.JMSException;
@ -66,12 +70,14 @@ import org.eclipse.jetty.continuation.ContinuationSupport;
*/
public class MessageListenerServlet extends MessageServletSupport {
private static final Log LOG = LogFactory.getLog(MessageListenerServlet.class);
private String readTimeoutParameter = "timeout";
private long defaultReadTimeout = -1;
private long maximumReadTimeout = 25000;
private int maximumMessages = 100;
private Timer clientCleanupTimer = new Timer();
private HashMap<String,AjaxWebClient> ajaxWebClients = new HashMap<String,AjaxWebClient>();
public void init() throws ServletException {
ServletConfig servletConfig = getServletConfig();
String name = servletConfig.getInitParameter("defaultReadTimeout");
@ -86,8 +92,9 @@ public class MessageListenerServlet extends MessageServletSupport {
if (name != null) {
maximumMessages = (int)asLong(name);
}
clientCleanupTimer.schedule( new ClientCleaner(), 5000, 60000 );
}
/**
* Sends a message to a destination or manage subscriptions. If the the
* content type of the POST is
@ -110,14 +117,13 @@ public class MessageListenerServlet extends MessageServletSupport {
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
// lets turn the HTTP post into a JMS Message
WebClient client = WebClient.getWebClient(request);
AjaxWebClient client = getAjaxWebClient( request );
String messageIds = "";
synchronized (client) {
if (LOG.isDebugEnabled()) {
LOG.debug("POST client=" + client + " session=" + request.getSession().getId() + " info=" + request.getPathInfo() + " contentType=" + request.getContentType());
LOG.debug("POST client=" + client + " session=" + request.getSession().getId() + " clientId="+ request.getParameter("clientId") + " info=" + request.getPathInfo() + " contentType=" + request.getContentType());
// dump(request.getParameterMap());
}
@ -151,27 +157,27 @@ public class MessageListenerServlet extends MessageServletSupport {
messages++;
if ("listen".equals(type)) {
Listener listener = getListener(request);
Map<MessageAvailableConsumer, String> consumerIdMap = getConsumerIdMap(request);
Map<MessageAvailableConsumer, String> consumerDestinationMap = getConsumerDestinationNameMap(request);
AjaxListener listener = client.getListener();
Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
client.closeConsumer(destination); // drop any existing
// consumer.
MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
consumer.setAvailableListener(listener);
consumerIdMap.put(consumer, message);
consumerDestinationMap.put(consumer, destinationName);
consumerDestinationNameMap.put(consumer, destinationName);
if (LOG.isDebugEnabled()) {
LOG.debug("Subscribed: " + consumer + " to " + destination + " id=" + message);
}
} else if ("unlisten".equals(type)) {
Map<MessageAvailableConsumer, String> consumerIdMap = getConsumerIdMap(request);
Map consumerDestinationMap = getConsumerDestinationNameMap(request);
Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
Map consumerDestinationNameMap = client.getDestinationNameMap();
MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
consumer.setAvailableListener(null);
consumerIdMap.remove(consumer);
consumerDestinationMap.remove(consumer);
consumerDestinationNameMap.remove(consumer);
client.closeConsumer(destination);
if (LOG.isDebugEnabled()) {
LOG.debug("Unsubscribed: " + consumer);
@ -233,9 +239,9 @@ public class MessageListenerServlet extends MessageServletSupport {
*/
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
try {
WebClient client = WebClient.getWebClient(request);
AjaxWebClient client = getAjaxWebClient(request);
if (LOG.isDebugEnabled()) {
LOG.debug("GET client=" + client + " session=" + request.getSession().getId() + " uri=" + request.getRequestURI() + " query=" + request.getQueryString());
LOG.debug("GET client=" + client + " session=" + request.getSession().getId() + " clientId="+ request.getParameter("clientId") + " uri=" + request.getRequestURI() + " query=" + request.getQueryString());
}
doMessages(client, request, response);
@ -253,7 +259,7 @@ public class MessageListenerServlet extends MessageServletSupport {
* @throws ServletException
* @throws IOException
*/
protected void doMessages(WebClient client, HttpServletRequest request, HttpServletResponse response) throws JMSException, IOException {
protected void doMessages(AjaxWebClient client, HttpServletRequest request, HttpServletResponse response) throws JMSException, IOException {
int messages = 0;
// This is a poll for any messages
@ -286,7 +292,11 @@ public class MessageListenerServlet extends MessageServletSupport {
}
}
}
// prepare the response
response.setContentType("text/xml");
response.setHeader("Cache-Control", "no-cache");
if (message == null) {
Continuation continuation = ContinuationSupport.getContinuation(request);
@ -308,23 +318,19 @@ public class MessageListenerServlet extends MessageServletSupport {
continuation.suspend();
// Fetch the listeners
Listener listener = getListener(request);
AjaxListener listener = client.getListener();
// register this continuation with our listener.
listener.setContinuation(continuation);
return;
}
// prepare the responds
response.setContentType("text/xml");
response.setHeader("Cache-Control", "no-cache");
StringWriter swriter = new StringWriter();
PrintWriter writer = new PrintWriter(swriter);
Map<MessageAvailableConsumer, String> consumerIdMap = getConsumerIdMap(request);
Map<MessageAvailableConsumer, String> consumerDestinationNameMap = getConsumerDestinationNameMap(request);
Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
response.setStatus(HttpServletResponse.SC_OK);
writer.println("<ajax-response>");
@ -388,40 +394,35 @@ public class MessageListenerServlet extends MessageServletSupport {
}
writer.println("</response>");
}
protected Listener getListener(HttpServletRequest request) {
HttpSession session = request.getSession();
Listener listener = (Listener)session.getAttribute("mls.listener");
if (listener == null) {
listener = new Listener(WebClient.getWebClient(request));
session.setAttribute("mls.listener", listener);
}
return listener;
}
protected Map<MessageAvailableConsumer, String> getConsumerIdMap(HttpServletRequest request) {
/*
* Return the AjaxWebClient for this session+clientId.
* Create one if it does not already exist.
*/
protected AjaxWebClient getAjaxWebClient( HttpServletRequest request ) {
long now = (new Date()).getTime();
HttpSession session = request.getSession(true);
Map<MessageAvailableConsumer, String> map = (Map<MessageAvailableConsumer, String>)session.getAttribute("mls.consumerIdMap");
if (map == null) {
map = new HashMap<MessageAvailableConsumer, String>();
session.setAttribute("mls.consumerIdMap", map);
String clientId = request.getParameter( "clientId" );
// if user doesn't supply a 'clientId', we'll just use a default.
if( clientId == null ) {
clientId = "defaultAjaxWebClient";
}
return map;
}
protected Map<MessageAvailableConsumer, String> getConsumerDestinationNameMap(HttpServletRequest request) {
HttpSession session = request.getSession(true);
Map<MessageAvailableConsumer, String> map = (Map<MessageAvailableConsumer, String>)session.getAttribute("mls.consumerDestinationNameMap");
if (map == null) {
map = new HashMap<MessageAvailableConsumer, String>();
session.setAttribute("mls.consumerDestinationNameMap", map);
String sessionKey = session.getId() + '-' + clientId;
AjaxWebClient client = ajaxWebClients.get( sessionKey );
synchronized (ajaxWebClients) {
// create a new AjaxWebClient if one does not already exist for this sessionKey.
if( client == null ) {
if (LOG.isDebugEnabled()) {
LOG.debug( "creating new AjaxWebClient in "+sessionKey );
}
client = new AjaxWebClient( request, maximumReadTimeout );
ajaxWebClients.put( sessionKey, client );
}
client.updateLastAccessed();
}
return map;
}
protected boolean isRicoAjax(HttpServletRequest request) {
String rico = request.getParameter("rico");
return rico != null && rico.equals("true");
return client;
}
/**
@ -440,48 +441,34 @@ public class MessageListenerServlet extends MessageServletSupport {
}
return answer;
}
/*
* Listen for available messages and wakeup any continuations.
* an instance of this class runs every minute (started in init), to clean up old web clients & free resources.
*/
private class Listener implements MessageAvailableListener {
WebClient client;
long lastAccess;
Continuation continuation;
Listener(WebClient client) {
this.client = client;
}
public void access() {
lastAccess = System.currentTimeMillis();
}
public synchronized void setContinuation(Continuation continuation) {
this.continuation = continuation;
}
public synchronized void onMessageAvailable(MessageConsumer consumer) {
if (LOG.isDebugEnabled()) {
LOG.debug("message for " + consumer + "continuation=" + continuation);
private class ClientCleaner extends TimerTask {
public void run() {
if( LOG.isDebugEnabled() ) {
LOG.debug( "Cleaning up expired web clients." );
}
if (continuation != null) {
try {
Message message = consumer.receive(10);
continuation.setAttribute("message", message);
continuation.setAttribute("consumer", consumer);
} catch (Exception e) {
LOG.error("Error receiving message " + e, e);
synchronized( ajaxWebClients ) {
Iterator it = ajaxWebClients.entrySet().iterator();
while ( it.hasNext() ) {
Map.Entry<String,AjaxWebClient> e = (Map.Entry<String,AjaxWebClient>)it.next();
String key = e.getKey();
AjaxWebClient val = e.getValue();
if ( LOG.isDebugEnabled() ) {
LOG.debug( "AjaxWebClient " + key + " last accessed " + val.getMillisSinceLastAccessed()/1000 + " seconds ago." );
}
continuation.resume();
} else if (System.currentTimeMillis() - lastAccess > 2 * maximumReadTimeout) {
new Thread() {
public void run() {
client.closeConsumers();
};
}.start();
// close an expired client and remove it from the ajaxWebClients hash.
if( val.closeIfExpired() ) {
if ( LOG.isDebugEnabled() ) {
LOG.debug( "Removing expired AjaxWebClient " + key );
}
it.remove();
}
}
}
}
}
}