non static queue consumers

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@389757 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gregory John Wilkins 2006-03-29 11:29:56 +00:00
parent a094f32462
commit 4060ad9231
5 changed files with 152 additions and 183 deletions

View File

@ -1,49 +0,0 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.servlet.http.HttpSessionEvent;
import javax.servlet.http.HttpSessionListener;
/**
* Listens to sessions closing to ensure that JMS connections are
* cleaned up nicely
*
* @version $Revision: 1.1.1.1 $
*/
public class ConnectionManager implements HttpSessionListener {
private static final Log log = LogFactory.getLog(ConnectionManager.class);
public void sessionCreated(HttpSessionEvent event) {
}
public void sessionDestroyed(HttpSessionEvent event) {
/** TODO we can't use the session any more now!
WebClient client = WebClient.getWebClient(event.getSession());
try {
client.stop();
}
catch (JMSException e) {
log.warn("Error closing connection: " + e, e);
}
*/
}
}

View File

@ -55,6 +55,14 @@ import org.mortbay.util.ajax.ContinuationSupport;
* specify a readTimeout parameter to determine how long the servlet should
* block for.
*
* The servlet can be configured with the following init parameters:<dl>
* <dt>defaultReadTimeout</dt><dd>The default time in ms to wait for messages. May be overridden by a request using the 'timeout' parameter</dd>
* <dt>maximumReadTimeout</dt><dd>The maximum value a request may specify for the 'timeout' parameter</dd>
* <dt>maximumMessages</dt><dd>maximum messages to send per response</dd>
* <dt></dt><dd></dd>
* </dl>
*
*
* @version $Revision: 1.1.1.1 $
*/
public class MessageListenerServlet extends MessageServletSupport {
@ -132,6 +140,7 @@ public class MessageListenerServlet extends MessageServletSupport {
{
Listener listener = getListener(request);
Map consumerIdMap = getConsumerIdMap(request);
client.closeConsumer(destination); // drop any existing consumer.
MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination);
consumer.setAvailableListener(listener);
@ -145,9 +154,9 @@ public class MessageListenerServlet extends MessageServletSupport {
Map consumerIdMap = getConsumerIdMap(request);
MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination);
// TODO should we destroy consumer on unsubscribe?
consumer.setAvailableListener(null);
consumerIdMap.remove(consumer);
client.closeConsumer(destination);
if (log.isDebugEnabled()) {
log.debug("Unsubscribed: "+consumer);
}
@ -228,7 +237,6 @@ public class MessageListenerServlet extends MessageServletSupport {
catch (JMSException e) {
throw new ServletException("JMS problem: " + e, e);
}
// System.err.println("--");
}
/**
@ -251,11 +259,12 @@ public class MessageListenerServlet extends MessageServletSupport {
log.debug("doMessage timeout="+timeout);
}
Continuation continuation = null;
Message message = null;
Continuation continuation = ContinuationSupport.getContinuation(request, client);
Listener listener = getListener(request);
if (listener!=null && continuation!=null && !continuation.isPending())
listener.access();
Message message = null;
synchronized (client) {
List consumers = client.getConsumers();
@ -278,8 +287,6 @@ public class MessageListenerServlet extends MessageServletSupport {
// messages
if (message == null) {
continuation = ContinuationSupport.getContinuation(request, client);
// register this continuation with our listener.
listener.setContinuation(continuation);
@ -287,6 +294,7 @@ public class MessageListenerServlet extends MessageServletSupport {
// request here).
continuation.suspend(timeout);
}
listener.setContinuation(null);
// prepare the responds
response.setContentType("text/xml");
@ -318,7 +326,6 @@ public class MessageListenerServlet extends MessageServletSupport {
// Look for any available messages
message = consumer.receiveNoWait();
// System.err.println("received "+message+" from "+consumer);
while (message != null && messages < maximumMessages) {
String id = (String) consumerIdMap.get(consumer);
writer.print("<response id='");
@ -333,6 +340,7 @@ public class MessageListenerServlet extends MessageServletSupport {
// Add poll message
// writer.println("<response type='object' id='amqPoll'><ok/></response>");
writer.print("</ajax-response>");
writer.flush();
@ -405,15 +413,18 @@ public class MessageListenerServlet extends MessageServletSupport {
*/
private class Listener implements MessageAvailableListener {
WebClient client;
long lastAccess;
Continuation continuation;
List queue = new LinkedList();
Listener(WebClient client) {
this.client = client;
}
public void access()
{
lastAccess=System.currentTimeMillis();
}
public void setContinuation(Continuation continuation) {
synchronized (client) {
this.continuation = continuation;
@ -427,21 +438,13 @@ public class MessageListenerServlet extends MessageServletSupport {
}
if (continuation != null)
continuation.resume();
else if (System.currentTimeMillis()-lastAccess>2*maximumReadTimeout)
{
client.closeConsumers();
}
continuation = null;
}
}
}
private static void dump(Map map)
{
Iterator iter=map.entrySet().iterator();
while(iter.hasNext())
{
Map.Entry entry=(Map.Entry)iter.next();
String k=(String)entry.getKey();
String[] v=(String[])entry.getValue();
System.err.println(k+":"+(v==null?"[]":Arrays.asList(v).toString()));
}
}
}

View File

@ -38,6 +38,11 @@ import java.util.Map;
* there are various ways to map JMS operations to web requests
* so we put most of the common behaviour in a reusable base class.
*
* This servlet can be configured with the following init paramters <dl>
* <dt>topic</dt><dd>Set to 'true' if the servle should default to using topics rather than channels</dd>
* <dt>destination</dt><dd>The default destination to use if one is not specifiied</dd>
* <dt></dt><dd></dd>
* </dl>
* @version $Revision: 1.1.1.1 $
*/
public abstract class MessageServletSupport extends HttpServlet {
@ -74,7 +79,7 @@ public abstract class MessageServletSupport extends HttpServlet {
}
protected WebClient createWebClient(HttpServletRequest request) {
return new WebClient(getServletContext());
return new WebClient();
}
public static boolean asBoolean(String param) {
@ -99,7 +104,7 @@ public abstract class MessageServletSupport extends HttpServlet {
protected WebClient getWebClient(HttpServletRequest request) {
HttpSession session = request.getSession(true);
WebClient client = WebClient.getWebClient(session);
if (client == null) {
if (client == null || client.isClosed()) {
client = createWebClient(request);
session.setAttribute(WebClient.webClientAttribute, client);
}

View File

@ -35,43 +35,45 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpSession;
import javax.servlet.http.HttpSessionActivationListener;
import javax.servlet.http.HttpSessionBindingEvent;
import javax.servlet.http.HttpSessionBindingListener;
import javax.servlet.http.HttpSessionEvent;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.MessageAvailableConsumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.Semaphore;
/**
* Represents a messaging client used from inside a web container
* typically stored inside a HttpSession
*
* TODO controls to prevent DOS attacks with users requesting many consumers
* TODO configure consumers with small prefetch.
*
* @version $Revision: 1.1.1.1 $
*/
public class WebClient implements HttpSessionActivationListener, Externalizable {
public class WebClient implements HttpSessionActivationListener, HttpSessionBindingListener, Externalizable {
public static final String webClientAttribute = "org.apache.activemq.webclient";
public static final String connectionFactoryAttribute = "org.apache.activemq.connectionFactory";
public static final String queueConsumersAttribute = "org.apache.activemq.queueConsumers";
public static final String brokerUrlInitParam = "org.apache.activemq.brokerURL";
private static final Log log = LogFactory.getLog(WebClient.class);
private static transient ConnectionFactory factory;
private static transient Map queueConsumers;
private transient ServletContext context;
private transient Map consumers = new HashMap();
private transient ActiveMQConnection connection;
private transient ActiveMQSession session;
private transient MessageProducer producer;
private transient Map topicConsumers = new ConcurrentHashMap();
private int deliveryMode = DeliveryMode.NON_PERSISTENT;
private final Semaphore semaphore = new Semaphore(1);
@ -86,24 +88,14 @@ public class WebClient implements HttpSessionActivationListener, Externalizable
public static void initContext(ServletContext context) {
factory = initConnectionFactory(context);
if (factory == null) {
log.warn("No ConnectionFactory available in the ServletContext for: " + connectionFactoryAttribute);
factory = new ActiveMQConnectionFactory("vm://localhost");
context.setAttribute(connectionFactoryAttribute, factory);
}
queueConsumers = initQueueConsumers(context);
initConnectionFactory(context);
}
/**
* Only called by serialization
*/
public WebClient() {
}
public WebClient(ServletContext context) {
this.context = context;
initContext(context);
if (factory==null)
throw new IllegalStateException("initContext(ServletContext) not called");
}
@ -117,28 +109,80 @@ public class WebClient implements HttpSessionActivationListener, Externalizable
}
public void start() throws JMSException {
public synchronized void closeConsumers()
{
for (Iterator it = consumers.values().iterator(); it.hasNext();) {
MessageConsumer consumer = (MessageConsumer) it.next();
it.remove();
try{
consumer.setMessageListener(null);
if (consumer instanceof MessageAvailableConsumer)
((MessageAvailableConsumer)consumer).setAvailableListener(null);
consumer.close();
}
catch(JMSException e)
{
e.printStackTrace();
}
}
}
public void stop() throws JMSException {
System.out.println("Closing the WebClient!!! " + this);
public synchronized void close() {
try {
connection.close();
closeConsumers();
if (connection!=null)
connection.close();
} catch (JMSException e) {
throw new RuntimeException(e);
}
finally {
producer = null;
session = null;
connection = null;
topicConsumers.clear();
if (consumers!=null)
consumers.clear();
consumers=null;
}
}
public boolean isClosed()
{
return consumers==null;
}
public void writeExternal(ObjectOutput out) throws IOException {
if (consumers!=null)
{
out.write(consumers.size());
Iterator i=consumers.keySet().iterator();
while(i.hasNext())
out.writeObject(i.next().toString());
}
else
out.write(-1);
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
topicConsumers = new HashMap();
int size = in.readInt();
if (size >=0) {
consumers = new HashMap();
for (int i=0;i<size;i++) {
String destinationName = in.readObject().toString();
try{
Destination destination = destinationName.startsWith("topic://")
?(Destination)getSession().createTopic(destinationName)
:(Destination)getSession().createQueue(destinationName);
consumers.put(destination,getConsumer(destination, true));
}
catch (JMSException e)
{
e.printStackTrace(); // TODO better handling?
}
}
}
}
public void send(Destination destination, Message message) throws JMSException {
@ -167,103 +211,75 @@ public class WebClient implements HttpSessionActivationListener, Externalizable
return connection;
}
public void sessionWillPassivate(HttpSessionEvent event) {
try {
stop();
}
catch (JMSException e) {
log.warn("Could not close connection: " + e, e);
}
}
public void sessionDidActivate(HttpSessionEvent event) {
// lets update the connection factory from the servlet context
context = event.getSession().getServletContext();
initContext(context);
}
public static Map initQueueConsumers(ServletContext context) {
Map answer = (Map) context.getAttribute(queueConsumersAttribute);
if (answer == null) {
answer = new HashMap();
context.setAttribute(queueConsumersAttribute, answer);
}
return answer;
}
public static ConnectionFactory initConnectionFactory(ServletContext servletContext) {
ConnectionFactory connectionFactory = (ConnectionFactory) servletContext.getAttribute(connectionFactoryAttribute);
if (connectionFactory == null) {
String brokerURL = (String) servletContext.getInitParameter(brokerUrlInitParam);
public static synchronized void initConnectionFactory(ServletContext servletContext) {
if (factory==null)
factory = (ConnectionFactory) servletContext.getAttribute(connectionFactoryAttribute);
if (factory == null) {
String brokerURL = servletContext.getInitParameter(brokerUrlInitParam);
servletContext.log("Value of: " + brokerUrlInitParam + " is: " + brokerURL);
if (brokerURL == null) {
brokerURL = "vm://localhost";
}
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
connectionFactory = factory;
servletContext.setAttribute(connectionFactoryAttribute, connectionFactory);
ActiveMQConnectionFactory amqfactory = new ActiveMQConnectionFactory(brokerURL);
factory = amqfactory;
servletContext.setAttribute(connectionFactoryAttribute, factory);
}
return connectionFactory;
}
public synchronized MessageConsumer getConsumer(Destination destination) throws JMSException {
if (destination instanceof Topic) {
MessageConsumer consumer = (MessageConsumer) topicConsumers.get(destination);
if (consumer == null) {
consumer = getSession().createConsumer(destination);
topicConsumers.put(destination, consumer);
}
return consumer;
return getConsumer(destination,true);
}
public synchronized MessageConsumer getConsumer(Destination destination, boolean create) throws JMSException {
MessageConsumer consumer = (MessageConsumer) consumers.get(destination);
if (create && consumer == null) {
consumer = getSession().createConsumer(destination);
consumers.put(destination, consumer);
}
else {
synchronized (queueConsumers) {
SessionConsumerPair pair = (SessionConsumerPair) queueConsumers.get(destination);
if (pair == null) {
pair = createSessionConsumerPair(destination);
queueConsumers.put(destination, pair);
}
return pair.consumer;
}
return consumer;
}
public synchronized void closeConsumer(Destination destination) throws JMSException {
MessageConsumer consumer = (MessageConsumer) consumers.get(destination);
if (consumer != null) {
consumers.remove(destination);
consumer.setMessageListener(null);
if (consumer instanceof MessageAvailableConsumer)
((MessageAvailableConsumer)consumer).setAvailableListener(null);
consumer.close();
}
}
public synchronized List getConsumers()
{
ArrayList list = new ArrayList(topicConsumers.size()+queueConsumers.size());
// TODO check this double synchronization on queue but not on topics
synchronized (queueConsumers) {
for (Iterator it = queueConsumers.values().iterator(); it.hasNext();) {
SessionConsumerPair pair = (SessionConsumerPair) it.next();
list.add(pair.consumer);
}
}
list.addAll(topicConsumers.values());
return list;
return new ArrayList(consumers.values());
}
protected ActiveMQSession createSession() throws JMSException {
return (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
}
protected SessionConsumerPair createSessionConsumerPair(Destination destination) throws JMSException {
SessionConsumerPair answer = new SessionConsumerPair();
answer.session = createSession();
answer.consumer = answer.session.createConsumer(destination);
return answer;
}
protected static class SessionConsumerPair {
public Session session;
public MessageConsumer consumer;
}
public Semaphore getSemaphore() {
return semaphore;
}
public void sessionWillPassivate(HttpSessionEvent event) {
close();
}
public void sessionDidActivate(HttpSessionEvent event) {
}
public void valueBound(HttpSessionBindingEvent event) {
}
public void valueUnbound(HttpSessionBindingEvent event) {
close();
}
}

View File

@ -38,12 +38,6 @@
<description>Whether we should include an embedded broker or not</description>
</context-param>
<!-- connection manager -->
<listener>
<listener-class>org.apache.activemq.web.ConnectionManager</listener-class>
</listener>
<!-- servlet mappings -->
<!-- the subscription REST servlet -->