apply additional fix and unit test for: https://issues.apache.org/jira/browse/AMQ-3856

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1343742 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-05-29 14:20:44 +00:00
parent 4d43c74fd2
commit 7504bc7a01
2 changed files with 97 additions and 73 deletions

View File

@ -16,25 +16,26 @@
*/ */
package org.apache.activemq.web; package org.apache.activemq.web;
import java.util.Set;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.management.ObjectName; import javax.management.ObjectName;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.eclipse.jetty.client.ContentExchange; import org.eclipse.jetty.client.ContentExchange;
import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.http.HttpStatus;
import java.util.Set; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RestTest extends JettyTestSupport { public class RestTest extends JettyTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(RestTest.class); private static final Logger LOG = LoggerFactory.getLogger(RestTest.class);
public void testConsume() throws Exception { public void testConsume() throws Exception {
producer.send(session.createTextMessage("test")); producer.send(session.createTextMessage("test"));
LOG.info("message sent"); LOG.info("message sent");
HttpClient httpClient = new HttpClient(); HttpClient httpClient = new HttpClient();
httpClient.start(); httpClient.start();
ContentExchange contentExchange = new ContentExchange(); ContentExchange contentExchange = new ContentExchange();
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL); httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
@ -42,9 +43,9 @@ public class RestTest extends JettyTestSupport {
httpClient.send(contentExchange); httpClient.send(contentExchange);
contentExchange.waitForDone(); contentExchange.waitForDone();
assertEquals("test", contentExchange.getResponseContent()); assertEquals("test", contentExchange.getResponseContent());
} }
public void testSubscribeFirst() throws Exception { public void testSubscribeFirst() throws Exception {
HttpClient httpClient = new HttpClient(); HttpClient httpClient = new HttpClient();
httpClient.start(); httpClient.start();
ContentExchange contentExchange = new ContentExchange(); ContentExchange contentExchange = new ContentExchange();
@ -59,18 +60,18 @@ public class RestTest extends JettyTestSupport {
contentExchange.waitForDone(); contentExchange.waitForDone();
assertEquals("test", contentExchange.getResponseContent()); assertEquals("test", contentExchange.getResponseContent());
} }
public void testSelector() throws Exception { public void testSelector() throws Exception {
TextMessage msg1 = session.createTextMessage("test1"); TextMessage msg1 = session.createTextMessage("test1");
msg1.setIntProperty("test", 1); msg1.setIntProperty("test", 1);
producer.send(msg1); producer.send(msg1);
LOG.info("message 1 sent"); LOG.info("message 1 sent");
TextMessage msg2 = session.createTextMessage("test2"); TextMessage msg2 = session.createTextMessage("test2");
msg2.setIntProperty("test", 2); msg2.setIntProperty("test", 2);
producer.send(msg2); producer.send(msg2);
LOG.info("message 2 sent"); LOG.info("message 2 sent");
HttpClient httpClient = new HttpClient(); HttpClient httpClient = new HttpClient();
httpClient.start(); httpClient.start();
@ -81,11 +82,11 @@ public class RestTest extends JettyTestSupport {
httpClient.send(contentExchange); httpClient.send(contentExchange);
contentExchange.waitForDone(); contentExchange.waitForDone();
assertEquals("test2", contentExchange.getResponseContent()); assertEquals("test2", contentExchange.getResponseContent());
} }
// test for https://issues.apache.org/activemq/browse/AMQ-2827 // test for https://issues.apache.org/activemq/browse/AMQ-2827
public void testCorrelation() throws Exception { public void testCorrelation() throws Exception {
for (int i = 0; i < 200; i++) { for (int i = 0; i < 200; i++) {
String correlId = "RESTY" + RandomStringUtils.randomNumeric(10); String correlId = "RESTY" + RandomStringUtils.randomNumeric(10);
TextMessage message = session.createTextMessage(correlId); TextMessage message = session.createTextMessage(correlId);
@ -106,8 +107,8 @@ public class RestTest extends JettyTestSupport {
LOG.info("Received: [" + contentExchange.getResponseStatus() + "] " + contentExchange.getResponseContent()); LOG.info("Received: [" + contentExchange.getResponseStatus() + "] " + contentExchange.getResponseContent());
assertEquals(200, contentExchange.getResponseStatus()); assertEquals(200, contentExchange.getResponseStatus());
assertEquals(correlId, contentExchange.getResponseContent()); assertEquals(correlId, contentExchange.getResponseContent());
} }
} }
public void testDisconnect() throws Exception { public void testDisconnect() throws Exception {
@ -133,4 +134,23 @@ public class RestTest extends JettyTestSupport {
Set<ObjectName> subs = broker.getManagementContext().queryNames(query, null); Set<ObjectName> subs = broker.getManagementContext().queryNames(query, null);
assertEquals("Consumers not closed", 0 , subs.size()); assertEquals("Consumers not closed", 0 , subs.size());
} }
public void testPost() throws Exception {
HttpClient httpClient = new HttpClient();
httpClient.start();
ContentExchange contentExchange = new ContentExchange();
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
contentExchange.setMethod("POST");
contentExchange.setURL("http://localhost:8080/message/testPost?type=queue");
httpClient.send(contentExchange);
contentExchange.waitForDone();
assertTrue("success status", HttpStatus.isSuccess(contentExchange.getResponseStatus()));
ContentExchange contentExchange2 = new ContentExchange();
contentExchange2.setURL("http://localhost:8080/message/testPost?readTimeout=1000&type=Queue");
httpClient.send(contentExchange2);
contentExchange2.waitForDone();
assertTrue("success status", HttpStatus.isSuccess(contentExchange2.getResponseStatus()));
}
} }

View File

@ -36,10 +36,10 @@ import org.apache.activemq.MessageAvailableConsumer;
import org.apache.activemq.MessageAvailableListener; import org.apache.activemq.MessageAvailableListener;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.eclipse.jetty.continuation.Continuation; import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationSupport; import org.eclipse.jetty.continuation.ContinuationSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* A servlet for sending and receiving messages to/from JMS destinations using * A servlet for sending and receiving messages to/from JMS destinations using
@ -48,8 +48,8 @@ import org.eclipse.jetty.continuation.ContinuationSupport;
* the servlet or as request parameters. <p/> For reading messages you can * the servlet or as request parameters. <p/> For reading messages you can
* specify a readTimeout parameter to determine how long the servlet should * specify a readTimeout parameter to determine how long the servlet should
* block for. * block for.
* *
* *
*/ */
public class MessageServlet extends MessageServletSupport { public class MessageServlet extends MessageServletSupport {
@ -61,7 +61,7 @@ public class MessageServlet extends MessageServletSupport {
private long defaultReadTimeout = -1; private long defaultReadTimeout = -1;
private long maximumReadTimeout = 20000; private long maximumReadTimeout = 20000;
private long requestTimeout = 1000; private long requestTimeout = 1000;
private HashMap<String, WebClient> clients = new HashMap<String, WebClient>(); private HashMap<String, WebClient> clients = new HashMap<String, WebClient>();
public void init() throws ServletException { public void init() throws ServletException {
@ -76,13 +76,13 @@ public class MessageServlet extends MessageServletSupport {
} }
name = servletConfig.getInitParameter("replyTimeout"); name = servletConfig.getInitParameter("replyTimeout");
if (name != null) { if (name != null) {
requestTimeout = asLong(name); requestTimeout = asLong(name);
} }
} }
/** /**
* Sends a message to a destination * Sends a message to a destination
* *
* @param request * @param request
* @param response * @param response
* @throws ServletException * @throws ServletException
@ -120,24 +120,24 @@ public class MessageServlet extends MessageServletSupport {
TextMessage message = client.getSession().createTextMessage(text); TextMessage message = client.getSession().createTextMessage(text);
if (sync) { if (sync) {
String point = "activemq:" String point = "activemq:"
+ ((ActiveMQDestination)destination).getPhysicalName().replace("//", "") + ((ActiveMQDestination)destination).getPhysicalName().replace("//", "")
+ "?requestTimeout=" + requestTimeout; + "?requestTimeout=" + requestTimeout;
try { try {
String body = (String)client.getProducerTemplate().requestBody(point, text); String body = (String)client.getProducerTemplate().requestBody(point, text);
ActiveMQTextMessage answer = new ActiveMQTextMessage(); ActiveMQTextMessage answer = new ActiveMQTextMessage();
answer.setText(body); answer.setText(body);
writeMessageResponse(response.getWriter(), answer); writeMessageResponse(response.getWriter(), answer);
} catch (Exception e) { } catch (Exception e) {
IOException ex = new IOException(); IOException ex = new IOException();
ex.initCause(e); ex.initCause(e);
throw ex; throw ex;
} }
} else { } else {
appendParametersToMessage(request, message); appendParametersToMessage(request, message);
boolean persistent = isSendPersistent(request); boolean persistent = isSendPersistent(request);
int priority = getSendPriority(request); int priority = getSendPriority(request);
long timeToLive = getSendTimeToLive(request); long timeToLive = getSendTimeToLive(request);
client.send(destination, message, persistent, priority, timeToLive); client.send(destination, message, persistent, priority, timeToLive);
} }
@ -167,7 +167,7 @@ public class MessageServlet extends MessageServletSupport {
/** /**
* Reads a message from a destination up to some specific timeout period * Reads a message from a destination up to some specific timeout period
* *
* @param request * @param request
* @param response * @param response
* @throws ServletException * @throws ServletException
@ -182,7 +182,7 @@ public class MessageServlet extends MessageServletSupport {
} }
MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName)); MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
Message message = null; Message message = null;
message = (Message)request.getAttribute("message"); message = (Message)request.getAttribute("message");
if (message != null) { if (message != null) {
// we're resuming continuation, // we're resuming continuation,
// so just write the message and return // so just write the message and return
@ -197,7 +197,7 @@ public class MessageServlet extends MessageServletSupport {
Continuation continuation = null; Continuation continuation = null;
Listener listener = null; Listener listener = null;
// Look for any available messages // Look for any available messages
message = consumer.receive(10); message = consumer.receive(10);
@ -206,7 +206,7 @@ public class MessageServlet extends MessageServletSupport {
// no events. // no events.
if (message == null) { if (message == null) {
continuation = ContinuationSupport.getContinuation(request); continuation = ContinuationSupport.getContinuation(request);
if (continuation.isExpired()) { if (continuation.isExpired()) {
response.setStatus(HttpServletResponse.SC_NO_CONTENT); response.setStatus(HttpServletResponse.SC_NO_CONTENT);
return; return;
@ -214,7 +214,7 @@ public class MessageServlet extends MessageServletSupport {
continuation.setTimeout(timeout); continuation.setTimeout(timeout);
continuation.suspend(); continuation.suspend();
// Fetch the listeners // Fetch the listeners
listener = (Listener)consumer.getAvailableListener(); listener = (Listener)consumer.getAvailableListener();
if (listener == null) { if (listener == null) {
@ -231,7 +231,7 @@ public class MessageServlet extends MessageServletSupport {
throw new ServletException("Could not post JMS message: " + e, e); throw new ServletException("Could not post JMS message: " + e, e);
} }
} }
protected void writeResponse(HttpServletRequest request, HttpServletResponse response, Message message) throws IOException, JMSException { protected void writeResponse(HttpServletRequest request, HttpServletResponse response, Message message) throws IOException, JMSException {
int messages = 0; int messages = 0;
try { try {
@ -251,7 +251,7 @@ public class MessageServlet extends MessageServletSupport {
if (type != null) { if (type != null) {
response.setContentType(type); response.setContentType(type);
} }
setResponseHeaders(response, message); setResponseHeaders(response, message);
writeMessageResponse(writer, message); writeMessageResponse(writer, message);
} }
@ -266,14 +266,18 @@ public class MessageServlet extends MessageServletSupport {
if (message instanceof TextMessage) { if (message instanceof TextMessage) {
TextMessage textMsg = (TextMessage)message; TextMessage textMsg = (TextMessage)message;
String txt = textMsg.getText(); String txt = textMsg.getText();
if (txt.startsWith("<?")) { if (txt != null) {
txt = txt.substring(txt.indexOf("?>") + 2); if (txt.startsWith("<?")) {
txt = txt.substring(txt.indexOf("?>") + 2);
}
writer.print(txt);
} }
writer.print(txt);
} else if (message instanceof ObjectMessage) { } else if (message instanceof ObjectMessage) {
ObjectMessage objectMsg = (ObjectMessage)message; ObjectMessage objectMsg = (ObjectMessage)message;
Object object = objectMsg.getObject(); Object object = objectMsg.getObject();
writer.print(object.toString()); if (object != null) {
writer.print(object.toString());
}
} }
} }
@ -281,25 +285,25 @@ public class MessageServlet extends MessageServletSupport {
String rico = request.getParameter("rico"); String rico = request.getParameter("rico");
return rico != null && rico.equals("true"); return rico != null && rico.equals("true");
} }
public WebClient getWebClient(HttpServletRequest request) { public WebClient getWebClient(HttpServletRequest request) {
String clientId = request.getParameter("clientId"); String clientId = request.getParameter("clientId");
if (clientId != null) { if (clientId != null) {
synchronized(this) { synchronized(this) {
LOG.debug("Getting local client [" + clientId + "]"); LOG.debug("Getting local client [" + clientId + "]");
WebClient client = clients.get(clientId); WebClient client = clients.get(clientId);
if (client == null) { if (client == null) {
LOG.debug("Creating new client [" + clientId + "]"); LOG.debug("Creating new client [" + clientId + "]");
client = new WebClient(); client = new WebClient();
clients.put(clientId, client); clients.put(clientId, client);
} }
return client; return client;
} }
} else { } else {
return WebClient.getWebClient(request); return WebClient.getWebClient(request);
} }
} }
protected String getContentType(HttpServletRequest request) { protected String getContentType(HttpServletRequest request) {
/* /*
@ -365,7 +369,7 @@ public class MessageServlet extends MessageServletSupport {
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error receiving message " + e, e); LOG.error("Error receiving message " + e, e);
} }
continuation.resume(); continuation.resume();
} }
} }
} }