mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-2728 - cleaning up MessageServlet
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@943467 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
21b81e180f
commit
c980168df5
|
@ -20,8 +20,6 @@ package org.apache.activemq.web;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
@ -54,6 +52,9 @@ import org.eclipse.jetty.continuation.ContinuationSupport;
|
||||||
* @version $Revision: 1.1.1.1 $
|
* @version $Revision: 1.1.1.1 $
|
||||||
*/
|
*/
|
||||||
public class MessageServlet extends MessageServletSupport {
|
public class MessageServlet extends MessageServletSupport {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 8737914695188481219L;
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(MessageServlet.class);
|
private static final Log LOG = LogFactory.getLog(MessageServlet.class);
|
||||||
|
|
||||||
private String readTimeoutParameter = "readTimeout";
|
private String readTimeoutParameter = "readTimeout";
|
||||||
|
@ -142,7 +143,7 @@ public class MessageServlet extends MessageServletSupport {
|
||||||
* from a queue
|
* from a queue
|
||||||
*/
|
*/
|
||||||
protected void doDelete(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
protected void doDelete(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
||||||
doMessages(request, response, 1);
|
doMessages(request, response);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -150,7 +151,7 @@ public class MessageServlet extends MessageServletSupport {
|
||||||
* from a queue
|
* from a queue
|
||||||
*/
|
*/
|
||||||
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
||||||
doMessages(request, response, -1);
|
doMessages(request, response);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -161,7 +162,7 @@ public class MessageServlet extends MessageServletSupport {
|
||||||
* @throws ServletException
|
* @throws ServletException
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
protected void doMessages(HttpServletRequest request, HttpServletResponse response, int maxMessages) throws ServletException, IOException {
|
protected void doMessages(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
||||||
try {
|
try {
|
||||||
WebClient client = getWebClient(request);
|
WebClient client = getWebClient(request);
|
||||||
Destination destination = getDestination(client, request);
|
Destination destination = getDestination(client, request);
|
||||||
|
@ -174,7 +175,7 @@ public class MessageServlet extends MessageServletSupport {
|
||||||
if (message != null) {
|
if (message != null) {
|
||||||
// we're resuming continuation,
|
// we're resuming continuation,
|
||||||
// so just write the message and return
|
// so just write the message and return
|
||||||
writeResponse(request, response, maxMessages, message, consumer);
|
writeResponse(request, response, message);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
long timeout = getReadTimeout(request);
|
long timeout = getReadTimeout(request);
|
||||||
|
@ -196,7 +197,7 @@ public class MessageServlet extends MessageServletSupport {
|
||||||
continuation = ContinuationSupport.getContinuation(request);
|
continuation = ContinuationSupport.getContinuation(request);
|
||||||
|
|
||||||
if (continuation.isExpired()) {
|
if (continuation.isExpired()) {
|
||||||
response.setStatus(isRicoAjax(request) ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
|
response.setStatus(HttpServletResponse.SC_NO_CONTENT);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -214,33 +215,24 @@ public class MessageServlet extends MessageServletSupport {
|
||||||
listener.setContinuation(continuation);
|
listener.setContinuation(continuation);
|
||||||
}
|
}
|
||||||
|
|
||||||
writeResponse(request, response, maxMessages, message, consumer);
|
writeResponse(request, response, message);
|
||||||
} catch (JMSException e) {
|
} catch (JMSException e) {
|
||||||
throw new ServletException("Could not post JMS message: " + e, e);
|
throw new ServletException("Could not post JMS message: " + e, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void writeResponse(HttpServletRequest request, HttpServletResponse response, int maxMessages, Message message, MessageAvailableConsumer consumer) throws IOException, JMSException {
|
protected void writeResponse(HttpServletRequest request, HttpServletResponse response, Message message) throws IOException, JMSException {
|
||||||
int messages = 0;
|
int messages = 0;
|
||||||
try {
|
try {
|
||||||
boolean ajax = isRicoAjax(request);
|
|
||||||
if (!ajax) {
|
|
||||||
maxMessages = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// write a responds
|
// write a responds
|
||||||
response.setContentType("text/xml");
|
response.setContentType("text/xml");
|
||||||
PrintWriter writer = response.getWriter();
|
PrintWriter writer = response.getWriter();
|
||||||
|
|
||||||
if (ajax) {
|
|
||||||
writer.println("<ajax-response>");
|
|
||||||
}
|
|
||||||
|
|
||||||
// handle any message(s)
|
// handle any message(s)
|
||||||
if (message == null) {
|
if (message == null) {
|
||||||
// No messages so OK response of for ajax else no content.
|
// No messages so OK response of for ajax else no content.
|
||||||
response.setStatus(ajax ? HttpServletResponse.SC_OK
|
response.setStatus(HttpServletResponse.SC_NO_CONTENT);
|
||||||
: HttpServletResponse.SC_NO_CONTENT);
|
|
||||||
} else {
|
} else {
|
||||||
// We have at least one message so set up the response
|
// We have at least one message so set up the response
|
||||||
response.setStatus(HttpServletResponse.SC_OK);
|
response.setStatus(HttpServletResponse.SC_OK);
|
||||||
|
@ -248,142 +240,10 @@ public class MessageServlet extends MessageServletSupport {
|
||||||
if (type != null) {
|
if (type != null) {
|
||||||
response.setContentType(type);
|
response.setContentType(type);
|
||||||
}
|
}
|
||||||
|
|
||||||
// send a response for each available message (up to max
|
setResponseHeaders(response, message);
|
||||||
// messages)
|
writeMessageResponse(writer, message);
|
||||||
while ((maxMessages < 0 || messages < maxMessages)
|
|
||||||
&& message != null) {
|
|
||||||
if (ajax) {
|
|
||||||
writer.print("<response type='object' id='");
|
|
||||||
writer.print(request.getParameter("id"));
|
|
||||||
writer.println("'>");
|
|
||||||
} else {
|
|
||||||
// only ever 1 message for non ajax!
|
|
||||||
setResponseHeaders(response, message);
|
|
||||||
}
|
|
||||||
|
|
||||||
writeMessageResponse(writer, message);
|
|
||||||
|
|
||||||
if (ajax) {
|
|
||||||
writer.println("</response>");
|
|
||||||
}
|
|
||||||
|
|
||||||
// look for next message
|
|
||||||
messages++;
|
|
||||||
if (maxMessages < 0 || messages < maxMessages) {
|
|
||||||
message = consumer.receiveNoWait();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ajax) {
|
|
||||||
writer
|
|
||||||
.println("<response type='object' id='poll'><ok/></response>");
|
|
||||||
writer.println("</ajax-response>");
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Received " + messages + " message(s)");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Reads a message from a destination up to some specific timeout period
|
|
||||||
*
|
|
||||||
* @param request
|
|
||||||
* @param response
|
|
||||||
* @throws ServletException
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
protected void doMessagesWithoutContinuation(HttpServletRequest request, HttpServletResponse response, int maxMessages) throws ServletException, IOException {
|
|
||||||
|
|
||||||
int messages = 0;
|
|
||||||
try {
|
|
||||||
WebClient client = getWebClient(request);
|
|
||||||
Destination destination = getDestination(client, request);
|
|
||||||
long timeout = getReadTimeout(request);
|
|
||||||
boolean ajax = isRicoAjax(request);
|
|
||||||
if (!ajax) {
|
|
||||||
maxMessages = 1;
|
|
||||||
}
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
|
|
||||||
Message message = null;
|
|
||||||
|
|
||||||
// write a responds
|
|
||||||
response.setContentType("text/xml");
|
|
||||||
PrintWriter writer = response.getWriter();
|
|
||||||
|
|
||||||
if (ajax) {
|
|
||||||
writer.println("<ajax-response>");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Only one client thread at a time should poll for messages.
|
|
||||||
if (client.getSemaphore().tryAcquire()) {
|
|
||||||
try {
|
|
||||||
// Look for any available messages
|
|
||||||
message = consumer.receive(timeout);
|
|
||||||
|
|
||||||
// handle any message(s)
|
|
||||||
if (message == null) {
|
|
||||||
// No messages so OK response of for ajax else no
|
|
||||||
// content.
|
|
||||||
response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
|
|
||||||
} else {
|
|
||||||
// We have at least one message so set up the
|
|
||||||
// response
|
|
||||||
response.setStatus(HttpServletResponse.SC_OK);
|
|
||||||
String type = getContentType(request);
|
|
||||||
if (type != null) {
|
|
||||||
response.setContentType(type);
|
|
||||||
}
|
|
||||||
|
|
||||||
// send a response for each available message (up to
|
|
||||||
// max
|
|
||||||
// messages)
|
|
||||||
while ((maxMessages < 0 || messages < maxMessages) && message != null) {
|
|
||||||
if (ajax) {
|
|
||||||
writer.print("<response type='object' id='");
|
|
||||||
writer.print(request.getParameter("id"));
|
|
||||||
writer.println("'>");
|
|
||||||
} else {
|
|
||||||
// only ever 1 message for non ajax!
|
|
||||||
setResponseHeaders(response, message);
|
|
||||||
}
|
|
||||||
|
|
||||||
writeMessageResponse(writer, message);
|
|
||||||
|
|
||||||
if (ajax) {
|
|
||||||
writer.println("</response>");
|
|
||||||
}
|
|
||||||
|
|
||||||
// look for next message
|
|
||||||
messages++;
|
|
||||||
if(maxMessages < 0 || messages < maxMessages) {
|
|
||||||
message = consumer.receiveNoWait();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
client.getSemaphore().release();
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Client is using us in another thread.
|
|
||||||
response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ajax) {
|
|
||||||
writer.println("<response type='object' id='poll'><ok/></response>");
|
|
||||||
writer.println("</ajax-response>");
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (JMSException e) {
|
|
||||||
throw new ServletException("Could not post JMS message: " + e, e);
|
|
||||||
} finally {
|
} finally {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Received " + messages + " message(s)");
|
LOG.debug("Received " + messages + " message(s)");
|
||||||
|
@ -472,7 +332,6 @@ public class MessageServlet extends MessageServletSupport {
|
||||||
private static class Listener implements MessageAvailableListener {
|
private static class Listener implements MessageAvailableListener {
|
||||||
MessageConsumer consumer;
|
MessageConsumer consumer;
|
||||||
Continuation continuation;
|
Continuation continuation;
|
||||||
List queue = new LinkedList();
|
|
||||||
|
|
||||||
Listener(MessageConsumer consumer) {
|
Listener(MessageConsumer consumer) {
|
||||||
this.consumer = consumer;
|
this.consumer = consumer;
|
||||||
|
@ -493,7 +352,7 @@ public class MessageServlet extends MessageServletSupport {
|
||||||
Message message = consumer.receiveNoWait();
|
Message message = consumer.receiveNoWait();
|
||||||
continuation.setAttribute("message", message);
|
continuation.setAttribute("message", message);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
LOG.error("Error receiving message " + e, e);
|
||||||
}
|
}
|
||||||
continuation.resume();
|
continuation.resume();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue