diff --git a/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java b/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java index cf2b60414f..ccb9ec8cb3 100644 --- a/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java +++ b/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java @@ -16,25 +16,26 @@ */ package org.apache.activemq.web; +import java.util.Set; + import javax.jms.TextMessage; import javax.management.ObjectName; import org.apache.commons.lang.RandomStringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.eclipse.jetty.client.ContentExchange; import org.eclipse.jetty.client.HttpClient; - -import java.util.Set; +import org.eclipse.jetty.http.HttpStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RestTest extends JettyTestSupport { private static final Logger LOG = LoggerFactory.getLogger(RestTest.class); - public void testConsume() throws Exception { - producer.send(session.createTextMessage("test")); - LOG.info("message sent"); + public void testConsume() throws Exception { + producer.send(session.createTextMessage("test")); + LOG.info("message sent"); - HttpClient httpClient = new HttpClient(); + HttpClient httpClient = new HttpClient(); httpClient.start(); ContentExchange contentExchange = new ContentExchange(); httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL); @@ -42,9 +43,9 @@ public class RestTest extends JettyTestSupport { httpClient.send(contentExchange); contentExchange.waitForDone(); assertEquals("test", contentExchange.getResponseContent()); - } + } - public void testSubscribeFirst() throws Exception { + public void testSubscribeFirst() throws Exception { HttpClient httpClient = new HttpClient(); httpClient.start(); ContentExchange contentExchange = new ContentExchange(); @@ -59,18 +60,18 @@ public class RestTest extends JettyTestSupport { contentExchange.waitForDone(); assertEquals("test", contentExchange.getResponseContent()); - } + } - public void testSelector() throws Exception { - TextMessage msg1 = session.createTextMessage("test1"); - msg1.setIntProperty("test", 1); - producer.send(msg1); - LOG.info("message 1 sent"); + public void testSelector() throws Exception { + TextMessage msg1 = session.createTextMessage("test1"); + msg1.setIntProperty("test", 1); + producer.send(msg1); + LOG.info("message 1 sent"); - TextMessage msg2 = session.createTextMessage("test2"); - msg2.setIntProperty("test", 2); - producer.send(msg2); - LOG.info("message 2 sent"); + TextMessage msg2 = session.createTextMessage("test2"); + msg2.setIntProperty("test", 2); + producer.send(msg2); + LOG.info("message 2 sent"); HttpClient httpClient = new HttpClient(); httpClient.start(); @@ -81,11 +82,11 @@ public class RestTest extends JettyTestSupport { httpClient.send(contentExchange); contentExchange.waitForDone(); assertEquals("test2", contentExchange.getResponseContent()); - } + } - // test for https://issues.apache.org/activemq/browse/AMQ-2827 - public void testCorrelation() throws Exception { - for (int i = 0; i < 200; i++) { + // test for https://issues.apache.org/activemq/browse/AMQ-2827 + public void testCorrelation() throws Exception { + for (int i = 0; i < 200; i++) { String correlId = "RESTY" + RandomStringUtils.randomNumeric(10); TextMessage message = session.createTextMessage(correlId); @@ -106,8 +107,8 @@ public class RestTest extends JettyTestSupport { LOG.info("Received: [" + contentExchange.getResponseStatus() + "] " + contentExchange.getResponseContent()); assertEquals(200, contentExchange.getResponseStatus()); assertEquals(correlId, contentExchange.getResponseContent()); - } - } + } + } public void testDisconnect() throws Exception { @@ -133,4 +134,23 @@ public class RestTest extends JettyTestSupport { Set subs = broker.getManagementContext().queryNames(query, null); assertEquals("Consumers not closed", 0 , subs.size()); } + + public void testPost() throws Exception { + HttpClient httpClient = new HttpClient(); + httpClient.start(); + ContentExchange contentExchange = new ContentExchange(); + httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL); + contentExchange.setMethod("POST"); + contentExchange.setURL("http://localhost:8080/message/testPost?type=queue"); + httpClient.send(contentExchange); + + contentExchange.waitForDone(); + assertTrue("success status", HttpStatus.isSuccess(contentExchange.getResponseStatus())); + + ContentExchange contentExchange2 = new ContentExchange(); + contentExchange2.setURL("http://localhost:8080/message/testPost?readTimeout=1000&type=Queue"); + httpClient.send(contentExchange2); + contentExchange2.waitForDone(); + assertTrue("success status", HttpStatus.isSuccess(contentExchange2.getResponseStatus())); + } } diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java index 8936e55083..ee890c9302 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java @@ -36,10 +36,10 @@ import org.apache.activemq.MessageAvailableConsumer; import org.apache.activemq.MessageAvailableListener; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTextMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.eclipse.jetty.continuation.Continuation; import org.eclipse.jetty.continuation.ContinuationSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A servlet for sending and receiving messages to/from JMS destinations using @@ -48,8 +48,8 @@ import org.eclipse.jetty.continuation.ContinuationSupport; * the servlet or as request parameters.

For reading messages you can * specify a readTimeout parameter to determine how long the servlet should * block for. - * - * + * + * */ public class MessageServlet extends MessageServletSupport { @@ -61,7 +61,7 @@ public class MessageServlet extends MessageServletSupport { private long defaultReadTimeout = -1; private long maximumReadTimeout = 20000; private long requestTimeout = 1000; - + private HashMap clients = new HashMap(); public void init() throws ServletException { @@ -76,13 +76,13 @@ public class MessageServlet extends MessageServletSupport { } name = servletConfig.getInitParameter("replyTimeout"); if (name != null) { - requestTimeout = asLong(name); - } + requestTimeout = asLong(name); + } } /** * Sends a message to a destination - * + * * @param request * @param response * @throws ServletException @@ -120,24 +120,24 @@ public class MessageServlet extends MessageServletSupport { TextMessage message = client.getSession().createTextMessage(text); if (sync) { - String point = "activemq:" - + ((ActiveMQDestination)destination).getPhysicalName().replace("//", "") - + "?requestTimeout=" + requestTimeout; + String point = "activemq:" + + ((ActiveMQDestination)destination).getPhysicalName().replace("//", "") + + "?requestTimeout=" + requestTimeout; try { - String body = (String)client.getProducerTemplate().requestBody(point, text); + String body = (String)client.getProducerTemplate().requestBody(point, text); ActiveMQTextMessage answer = new ActiveMQTextMessage(); answer.setText(body); - writeMessageResponse(response.getWriter(), answer); + writeMessageResponse(response.getWriter(), answer); } catch (Exception e) { - IOException ex = new IOException(); - ex.initCause(e); - throw ex; + IOException ex = new IOException(); + ex.initCause(e); + throw ex; } } else { appendParametersToMessage(request, message); boolean persistent = isSendPersistent(request); int priority = getSendPriority(request); - long timeToLive = getSendTimeToLive(request); + long timeToLive = getSendTimeToLive(request); client.send(destination, message, persistent, priority, timeToLive); } @@ -167,7 +167,7 @@ public class MessageServlet extends MessageServletSupport { /** * Reads a message from a destination up to some specific timeout period - * + * * @param request * @param response * @throws ServletException @@ -182,7 +182,7 @@ public class MessageServlet extends MessageServletSupport { } MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName)); Message message = null; - message = (Message)request.getAttribute("message"); + message = (Message)request.getAttribute("message"); if (message != null) { // we're resuming continuation, // so just write the message and return @@ -197,7 +197,7 @@ public class MessageServlet extends MessageServletSupport { Continuation continuation = null; Listener listener = null; - + // Look for any available messages message = consumer.receive(10); @@ -206,7 +206,7 @@ public class MessageServlet extends MessageServletSupport { // no events. if (message == null) { continuation = ContinuationSupport.getContinuation(request); - + if (continuation.isExpired()) { response.setStatus(HttpServletResponse.SC_NO_CONTENT); return; @@ -214,7 +214,7 @@ public class MessageServlet extends MessageServletSupport { continuation.setTimeout(timeout); continuation.suspend(); - + // Fetch the listeners listener = (Listener)consumer.getAvailableListener(); if (listener == null) { @@ -231,7 +231,7 @@ public class MessageServlet extends MessageServletSupport { throw new ServletException("Could not post JMS message: " + e, e); } } - + protected void writeResponse(HttpServletRequest request, HttpServletResponse response, Message message) throws IOException, JMSException { int messages = 0; try { @@ -251,7 +251,7 @@ public class MessageServlet extends MessageServletSupport { if (type != null) { response.setContentType(type); } - + setResponseHeaders(response, message); writeMessageResponse(writer, message); } @@ -266,14 +266,18 @@ public class MessageServlet extends MessageServletSupport { if (message instanceof TextMessage) { TextMessage textMsg = (TextMessage)message; String txt = textMsg.getText(); - if (txt.startsWith("") + 2); + if (txt != null) { + if (txt.startsWith("") + 2); + } + writer.print(txt); } - writer.print(txt); } else if (message instanceof ObjectMessage) { ObjectMessage objectMsg = (ObjectMessage)message; Object object = objectMsg.getObject(); - writer.print(object.toString()); + if (object != null) { + writer.print(object.toString()); + } } } @@ -281,25 +285,25 @@ public class MessageServlet extends MessageServletSupport { String rico = request.getParameter("rico"); return rico != null && rico.equals("true"); } - + public WebClient getWebClient(HttpServletRequest request) { - String clientId = request.getParameter("clientId"); - if (clientId != null) { - synchronized(this) { - LOG.debug("Getting local client [" + clientId + "]"); - WebClient client = clients.get(clientId); - if (client == null) { - LOG.debug("Creating new client [" + clientId + "]"); - client = new WebClient(); - clients.put(clientId, client); - } - return client; - } - - } else { - return WebClient.getWebClient(request); - } - } + String clientId = request.getParameter("clientId"); + if (clientId != null) { + synchronized(this) { + LOG.debug("Getting local client [" + clientId + "]"); + WebClient client = clients.get(clientId); + if (client == null) { + LOG.debug("Creating new client [" + clientId + "]"); + client = new WebClient(); + clients.put(clientId, client); + } + return client; + } + + } else { + return WebClient.getWebClient(request); + } + } protected String getContentType(HttpServletRequest request) { /* @@ -365,7 +369,7 @@ public class MessageServlet extends MessageServletSupport { } catch (Exception e) { LOG.error("Error receiving message " + e, e); } - continuation.resume(); + continuation.resume(); } } }