mirror of https://github.com/apache/activemq.git
apply patch with cleanups
This commit is contained in:
parent
e56c062f27
commit
a2ede974b9
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
import java.io.PrintWriter;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
|
@ -42,11 +43,18 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
/**
|
||||
* A servlet for sending and receiving messages to/from JMS destinations using
|
||||
* HTTP POST for sending and HTTP GET for receiving. <p/> You can specify the
|
||||
* destination and whether it is a topic or queue via configuration details on
|
||||
* the servlet or as request parameters. <p/> For reading messages you can
|
||||
* specify a readTimeout parameter to determine how long the servlet should
|
||||
* block for.
|
||||
* HTTP POST for sending and HTTP GET for receiving.
|
||||
* <p/>
|
||||
* You can specify the destination and whether it is a topic or queue via
|
||||
* configuration details on the servlet or as request parameters.
|
||||
* <p/>
|
||||
* For reading messages you can specify a readTimeout parameter to determine how
|
||||
* long the servlet should block for.
|
||||
*
|
||||
* One thing to keep in mind with this solution - due to the nature of REST,
|
||||
* there will always be a chance of losing messages. Consider what happens when
|
||||
* a message is retrieved from the broker but the web call is interrupted before
|
||||
* the client receives the message in the response - the message is lost.
|
||||
*/
|
||||
public class MessageServlet extends MessageServletSupport {
|
||||
|
||||
|
@ -59,12 +67,15 @@ public class MessageServlet extends MessageServletSupport {
|
|||
private static final Logger LOG = LoggerFactory.getLogger(MessageServlet.class);
|
||||
|
||||
private final String readTimeoutParameter = "readTimeout";
|
||||
private final String readTimeoutRequestAtt = "xamqReadDeadline";
|
||||
private final String oneShotParameter = "oneShot";
|
||||
private long defaultReadTimeout = -1;
|
||||
private long maximumReadTimeout = 20000;
|
||||
private long requestTimeout = 1000;
|
||||
private String defaultContentType = "application/xml";
|
||||
|
||||
private final HashMap<String, WebClient> clients = new HashMap<String, WebClient>();
|
||||
private final HashSet<MessageAvailableConsumer> activeConsumers = new HashSet<MessageAvailableConsumer>();
|
||||
|
||||
@Override
|
||||
public void init() throws ServletException {
|
||||
|
@ -144,7 +155,7 @@ public class MessageServlet extends MessageServletSupport {
|
|||
}
|
||||
|
||||
/**
|
||||
* Supports a HTTP DELETE to be equivlanent of consuming a singe message
|
||||
* Supports a HTTP DELETE to be equivalent of consuming a singe message
|
||||
* from a queue
|
||||
*/
|
||||
@Override
|
||||
|
@ -153,7 +164,7 @@ public class MessageServlet extends MessageServletSupport {
|
|||
}
|
||||
|
||||
/**
|
||||
* Supports a HTTP DELETE to be equivlanent of consuming a singe message
|
||||
* Supports a HTTP DELETE to be equivalent of consuming a singe message
|
||||
* from a queue
|
||||
*/
|
||||
@Override
|
||||
|
@ -170,69 +181,115 @@ public class MessageServlet extends MessageServletSupport {
|
|||
* @throws IOException
|
||||
*/
|
||||
protected void doMessages(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
||||
MessageAvailableConsumer consumer = null;
|
||||
|
||||
try {
|
||||
WebClient client = getWebClient(request);
|
||||
Destination destination = getDestination(client, request);
|
||||
if (destination == null) {
|
||||
throw new NoDestinationSuppliedException();
|
||||
}
|
||||
MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
|
||||
Message message = null;
|
||||
message = (Message)request.getAttribute("message");
|
||||
if (message != null) {
|
||||
// we're resuming continuation,
|
||||
// so just write the message and return
|
||||
writeResponse(request, response, message);
|
||||
return;
|
||||
consumer = (MessageAvailableConsumer) client.getConsumer(destination, request.getHeader(WebClient.selectorName));
|
||||
Continuation continuation = ContinuationSupport.getContinuation(request);
|
||||
|
||||
// Don't allow concurrent use of the consumer. Do make sure to allow
|
||||
// subsequent calls on continuation to use the consumer.
|
||||
if (continuation.isInitial()) {
|
||||
synchronized (activeConsumers) {
|
||||
if (activeConsumers.contains(consumer)) {
|
||||
throw new ServletException("Concurrent access to consumer is not supported");
|
||||
} else {
|
||||
activeConsumers.add(consumer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Message message = null;
|
||||
|
||||
long deadline = getReadDeadline(request);
|
||||
long timeout = deadline - System.currentTimeMillis();
|
||||
|
||||
// Set the message available listener *before* calling receive to eliminate any
|
||||
// chance of a missed notification between the time receive() completes without
|
||||
// a message and the time the listener is set.
|
||||
synchronized (consumer) {
|
||||
Listener listener = (Listener) consumer.getAvailableListener();
|
||||
if (listener == null) {
|
||||
listener = new Listener(consumer);
|
||||
consumer.setAvailableListener(listener);
|
||||
}
|
||||
}
|
||||
long timeout = getReadTimeout(request);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout);
|
||||
}
|
||||
|
||||
Continuation continuation = null;
|
||||
Listener listener = null;
|
||||
|
||||
// Look for any available messages (need a little timeout)
|
||||
message = consumer.receive(10);
|
||||
|
||||
// Get an existing Continuation or create a new one if there are
|
||||
// no events.
|
||||
if (message == null) {
|
||||
continuation = ContinuationSupport.getContinuation(request);
|
||||
|
||||
if (continuation.isExpired()) {
|
||||
response.setStatus(HttpServletResponse.SC_NO_CONTENT);
|
||||
return;
|
||||
}
|
||||
|
||||
continuation.setTimeout(timeout);
|
||||
continuation.suspend();
|
||||
|
||||
// Fetch the listeners
|
||||
listener = (Listener)consumer.getAvailableListener();
|
||||
if (listener == null) {
|
||||
listener = new Listener(consumer);
|
||||
consumer.setAvailableListener(listener);
|
||||
}
|
||||
|
||||
// register this continuation with our listener.
|
||||
listener.setContinuation(continuation);
|
||||
// Look for any available messages (need a little timeout). Always
|
||||
// try at least one lookup; don't block past the deadline.
|
||||
if (timeout <= 0) {
|
||||
message = consumer.receiveNoWait();
|
||||
} else if (timeout < 10) {
|
||||
message = consumer.receive(timeout);
|
||||
} else {
|
||||
message = consumer.receive(10);
|
||||
}
|
||||
|
||||
writeResponse(request, response, message);
|
||||
if (message == null) {
|
||||
handleContinuation(request, response, client, destination, consumer, deadline);
|
||||
} else {
|
||||
writeResponse(request, response, message);
|
||||
closeConsumerOnOneShot(request, client, destination);
|
||||
|
||||
synchronized (activeConsumers) {
|
||||
activeConsumers.remove(consumer);
|
||||
}
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
throw new ServletException("Could not post JMS message: " + e, e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void handleContinuation(HttpServletRequest request, HttpServletResponse response, WebClient client, Destination destination,
|
||||
MessageAvailableConsumer consumer, long deadline) {
|
||||
// Get an existing Continuation or create a new one if there are no events.
|
||||
Continuation continuation = ContinuationSupport.getContinuation(request);
|
||||
|
||||
long timeout = deadline - System.currentTimeMillis();
|
||||
if ((continuation.isExpired()) || (timeout <= 0)) {
|
||||
// Reset the continuation on the available listener for the consumer to prevent the
|
||||
// next message receipt from being consumed without a valid, active continuation.
|
||||
synchronized (consumer) {
|
||||
Object obj = consumer.getAvailableListener();
|
||||
if (obj instanceof Listener) {
|
||||
((Listener) obj).setContinuation(null);
|
||||
}
|
||||
}
|
||||
response.setStatus(HttpServletResponse.SC_NO_CONTENT);
|
||||
closeConsumerOnOneShot(request, client, destination);
|
||||
synchronized (activeConsumers) {
|
||||
activeConsumers.remove(consumer);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
continuation.setTimeout(timeout);
|
||||
continuation.suspend();
|
||||
|
||||
synchronized (consumer) {
|
||||
Listener listener = (Listener) consumer.getAvailableListener();
|
||||
|
||||
// register this continuation with our listener.
|
||||
listener.setContinuation(continuation);
|
||||
}
|
||||
}
|
||||
|
||||
protected void writeResponse(HttpServletRequest request, HttpServletResponse response, Message message) throws IOException, JMSException {
|
||||
int messages = 0;
|
||||
try {
|
||||
response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate"); // HTTP 1.1
|
||||
response.setHeader("Pragma", "no-cache"); // HTTP 1.0
|
||||
response.setDateHeader("Expires", 0);
|
||||
response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate"); // HTTP
|
||||
// 1.1
|
||||
response.setHeader("Pragma", "no-cache"); // HTTP 1.0
|
||||
response.setDateHeader("Expires", 0);
|
||||
// write a responds
|
||||
PrintWriter writer = response.getWriter();
|
||||
|
||||
|
@ -269,7 +326,7 @@ public class MessageServlet extends MessageServletSupport {
|
|||
|
||||
protected void writeMessageResponse(PrintWriter writer, Message message) throws JMSException, IOException {
|
||||
if (message instanceof TextMessage) {
|
||||
TextMessage textMsg = (TextMessage)message;
|
||||
TextMessage textMsg = (TextMessage) message;
|
||||
String txt = textMsg.getText();
|
||||
if (txt != null) {
|
||||
if (txt.startsWith("<?")) {
|
||||
|
@ -278,7 +335,7 @@ public class MessageServlet extends MessageServletSupport {
|
|||
writer.print(txt);
|
||||
}
|
||||
} else if (message instanceof ObjectMessage) {
|
||||
ObjectMessage objectMsg = (ObjectMessage)message;
|
||||
ObjectMessage objectMsg = (ObjectMessage) message;
|
||||
Object object = objectMsg.getObject();
|
||||
if (object != null) {
|
||||
writer.print(object.toString());
|
||||
|
@ -288,7 +345,7 @@ public class MessageServlet extends MessageServletSupport {
|
|||
|
||||
protected boolean isXmlContent(Message message) throws JMSException {
|
||||
if (message instanceof TextMessage) {
|
||||
TextMessage textMsg = (TextMessage)message;
|
||||
TextMessage textMsg = (TextMessage) message;
|
||||
String txt = textMsg.getText();
|
||||
if (txt != null) {
|
||||
// assume its xml when it starts with <
|
||||
|
@ -304,7 +361,7 @@ public class MessageServlet extends MessageServletSupport {
|
|||
public WebClient getWebClient(HttpServletRequest request) {
|
||||
String clientId = request.getParameter("clientId");
|
||||
if (clientId != null) {
|
||||
synchronized(this) {
|
||||
synchronized (this) {
|
||||
LOG.debug("Getting local client [" + clientId + "]");
|
||||
WebClient client = clients.get(clientId);
|
||||
if (client == null) {
|
||||
|
@ -338,9 +395,9 @@ public class MessageServlet extends MessageServletSupport {
|
|||
response.setHeader("id", message.getJMSMessageID());
|
||||
|
||||
// Return JMS properties as header values.
|
||||
for(Enumeration names = message.getPropertyNames(); names.hasMoreElements();) {
|
||||
for (Enumeration names = message.getPropertyNames(); names.hasMoreElements();) {
|
||||
String name = (String) names.nextElement();
|
||||
response.setHeader(name , message.getObjectProperty(name).toString());
|
||||
response.setHeader(name, message.getObjectProperty(name).toString());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -348,17 +405,37 @@ public class MessageServlet extends MessageServletSupport {
|
|||
* @return the timeout value for read requests which is always >= 0 and <=
|
||||
* maximumReadTimeout to avoid DoS attacks
|
||||
*/
|
||||
protected long getReadTimeout(HttpServletRequest request) {
|
||||
long answer = defaultReadTimeout;
|
||||
protected long getReadDeadline(HttpServletRequest request) {
|
||||
Long answer;
|
||||
|
||||
String name = request.getParameter(readTimeoutParameter);
|
||||
if (name != null) {
|
||||
answer = asLong(name);
|
||||
answer = (Long) request.getAttribute(readTimeoutRequestAtt);
|
||||
|
||||
if (answer == null) {
|
||||
long timeout = defaultReadTimeout;
|
||||
String name = request.getParameter(readTimeoutParameter);
|
||||
if (name != null) {
|
||||
timeout = asLong(name);
|
||||
}
|
||||
if (timeout < 0 || timeout > maximumReadTimeout) {
|
||||
timeout = maximumReadTimeout;
|
||||
}
|
||||
|
||||
answer = Long.valueOf(System.currentTimeMillis() + timeout);
|
||||
}
|
||||
if (answer < 0 || answer > maximumReadTimeout) {
|
||||
answer = maximumReadTimeout;
|
||||
return answer.longValue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the consumer if one-shot mode is used on the given request.
|
||||
*/
|
||||
protected void closeConsumerOnOneShot(HttpServletRequest request, WebClient client, Destination dest) {
|
||||
if (asBoolean(request.getParameter(oneShotParameter), false)) {
|
||||
try {
|
||||
client.closeConsumer(dest);
|
||||
} catch (JMSException jms_exc) {
|
||||
LOG.warn("JMS exception on closing consumer after request with one-shot mode", jms_exc);
|
||||
}
|
||||
}
|
||||
return answer;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -386,17 +463,9 @@ public class MessageServlet extends MessageServletSupport {
|
|||
|
||||
synchronized (this.consumer) {
|
||||
if (continuation != null) {
|
||||
try {
|
||||
Message message = consumer.receiveNoWait();
|
||||
continuation.setAttribute("message", message);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Error receiving message due " + e.getMessage() + ". This exception is ignored.", e);
|
||||
} finally {
|
||||
continuation.resume();
|
||||
}
|
||||
continuation.resume();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue