Was having some wierd issues with the consumer.. disabled continuations for now (but i don't think that was the issue).

added a semaphore to avoid multiple blocking threads (i've got a feeling this might be what fixed it).



git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@356666 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2005-12-14 01:07:08 +00:00
parent 9b1345bb32
commit 829787ac1b
2 changed files with 132 additions and 25 deletions

View File

@ -18,12 +18,10 @@
package org.activemq.web;
import org.activemq.MessageAvailableConsumer;
import org.activemq.MessageAvailableListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mortbay.util.ajax.Continuation;
import org.mortbay.util.ajax.ContinuationSupport;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.LinkedList;
import java.util.List;
import javax.jms.Destination;
import javax.jms.JMSException;
@ -36,10 +34,12 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.LinkedList;
import java.util.List;
import org.activemq.MessageAvailableConsumer;
import org.activemq.MessageAvailableListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mortbay.util.ajax.Continuation;
import org.mortbay.util.ajax.ContinuationSupport;
/**
* A servlet for sending and receiving messages to/from JMS destinations using
@ -110,7 +110,7 @@ public class MessageServlet extends MessageServletSupport {
* from a queue
*/
protected void doDelete(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
doMessages(request, response, 1);
doMessagesWithoutContinuation(request, response, 1);
}
/**
@ -118,7 +118,7 @@ public class MessageServlet extends MessageServletSupport {
* from a queue
*/
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
doMessages(request, response, -1);
doMessagesWithoutContinuation(request, response, -1);
}
/**
@ -235,6 +235,106 @@ public class MessageServlet extends MessageServletSupport {
}
}
/**
* Reads a message from a destination up to some specific timeout period
*
* @param request
* @param response
* @throws ServletException
* @throws IOException
*/
protected void doMessagesWithoutContinuation(HttpServletRequest request, HttpServletResponse response,
int maxMessages) throws ServletException, IOException {
int messages = 0;
try {
WebClient client = getWebClient(request);
Destination destination = getDestination(client, request);
long timeout = getReadTimeout(request);
boolean ajax = isRicoAjax(request);
if (!ajax)
maxMessages = 1;
if (log.isDebugEnabled()) {
log.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout);
}
MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination);
Continuation continuation = null;
Listener listener = null;
Message message = null;
// write a responds
response.setContentType("text/xml");
PrintWriter writer = response.getWriter();
if (ajax)
writer.println("<ajax-response>");
// Only one client thread at a time should poll for messages.
if (client.getSemaphore().tryAcquire()) {
try {
// Look for any available messages
message = consumer.receive(timeout);
// handle any message(s)
if (message == null) {
// No messages so OK response of for ajax else no
// content.
response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
} else {
// We have at least one message so set up the
// response
response.setStatus(HttpServletResponse.SC_OK);
String type = getContentType(request);
if (type != null)
response.setContentType(type);
// send a response for each available message (up to
// max
// messages)
while ((maxMessages < 0 || messages < maxMessages) && message != null) {
// System.err.println("message["+messages+"]="+message);
if (ajax) {
writer.print("<response type='object' id='");
writer.print(request.getParameter("id"));
writer.println("'>");
} else
// only ever 1 message for non ajax!
setResponseHeaders(response, message);
writeMessageResponse(writer, message);
if (ajax)
writer.println("</response>");
// look for next message
message = consumer.receiveNoWait();
messages++;
}
}
} finally {
client.getSemaphore().release();
}
} else {
// Client is using us in another thread.
response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
}
if (ajax) {
writer.println("<response type='object' id='poll'><ok/></response>");
writer.println("</ajax-response>");
}
} catch (JMSException e) {
throw new ServletException("Could not post JMS message: " + e, e);
} finally {
if (log.isDebugEnabled()) {
log.debug("Received " + messages + " message(s)");
}
}
}
protected void writeMessageResponse(PrintWriter writer, Message message) throws JMSException, IOException {
if (message instanceof TextMessage) {
TextMessage textMsg = (TextMessage) message;

View File

@ -18,13 +18,12 @@
package org.activemq.web;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQConnectionFactory;
import org.activemq.ActiveMQSession;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.HashMap;
import java.util.Map;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
@ -40,12 +39,14 @@ import javax.servlet.http.HttpSession;
import javax.servlet.http.HttpSessionActivationListener;
import javax.servlet.http.HttpSessionEvent;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.HashMap;
import java.util.Map;
import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQConnectionFactory;
import org.activemq.ActiveMQSession;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.Semaphore;
/**
* Represents a messaging client used from inside a web container
@ -72,6 +73,8 @@ public class WebClient implements HttpSessionActivationListener, Externalizable
private transient Map topicConsumers = new ConcurrentHashMap();
private int deliveryMode = DeliveryMode.NON_PERSISTENT;
private final Semaphore semaphore = new Semaphore(1);
/**
* @return the web client for the current HTTP session or null if there is not a web client created yet
@ -247,4 +250,8 @@ public class WebClient implements HttpSessionActivationListener, Externalizable
public Session session;
public MessageConsumer consumer;
}
public Semaphore getSemaphore() {
return semaphore;
}
}