From 2b9c59c2833a7ea18de0d3e0586ba0d15de34166 Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Tue, 11 May 2010 15:37:44 +0000 Subject: [PATCH] https://issues.apache.org/activemq/browse/AMQ-2728 - MessageServlet refactoring git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@943146 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/http/WaitForJettyListener.java | 2 +- .../apache/activemq/web/JettyTestSupport.java | 34 +++- .../org/apache/activemq/web/RestTest.java | 26 ++- .../apache/activemq/web/MessageServlet.java | 183 ++++++++++-------- 4 files changed, 155 insertions(+), 90 deletions(-) diff --git a/activemq-optional/src/test/java/org/apache/activemq/transport/http/WaitForJettyListener.java b/activemq-optional/src/test/java/org/apache/activemq/transport/http/WaitForJettyListener.java index 06d4b10738..028bb30b28 100644 --- a/activemq-optional/src/test/java/org/apache/activemq/transport/http/WaitForJettyListener.java +++ b/activemq-optional/src/test/java/org/apache/activemq/transport/http/WaitForJettyListener.java @@ -41,7 +41,7 @@ public class WaitForJettyListener { socket.close(); canConnect = true; } catch (Exception e) { - LOG.warn("verify jettty available, failed to connect to " + url + e); + LOG.warn("verify jetty available, failed to connect to " + url + e); } return canConnect; }}, 60 * 1000)); diff --git a/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java b/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java index 94e9b3c602..f99254bede 100644 --- a/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java +++ b/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java @@ -1,20 +1,28 @@ package org.apache.activemq.web; +import java.net.Socket; +import java.net.URL; + import javax.jms.Connection; import javax.jms.MessageProducer; import javax.jms.Session; +import javax.net.SocketFactory; + +import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.Wait; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.webapp.WebAppContext; -import junit.framework.TestCase; - public class JettyTestSupport extends TestCase { - + private static final Log LOG = LogFactory.getLog(JettyTestSupport.class); + BrokerService broker; Server server; ActiveMQConnectionFactory factory; @@ -43,7 +51,8 @@ public class JettyTestSupport extends TestCase { server.setConnectors(new Connector[] { connector }); - server.start(); + server.start(); + waitForJettySocketToAccept("http://localhost:8080"); factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); connection = factory.createConnection(); @@ -60,6 +69,21 @@ public class JettyTestSupport extends TestCase { connection.close(); } - + public void waitForJettySocketToAccept(String bindLocation) throws Exception { + final URL url = new URL(bindLocation); + assertTrue("Jetty endpoint is available", Wait.waitFor(new Wait.Condition() { + + public boolean isSatisified() throws Exception { + boolean canConnect = false; + try { + Socket socket = SocketFactory.getDefault().createSocket(url.getHost(), url.getPort()); + socket.close(); + canConnect = true; + } catch (Exception e) { + LOG.warn("verify jetty available, failed to connect to " + url + e); + } + return canConnect; + }}, 60 * 1000)); + } } diff --git a/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java b/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java index 681a068a46..83ddc2ed42 100644 --- a/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java +++ b/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java @@ -2,13 +2,17 @@ package org.apache.activemq.web; import javax.jms.TextMessage; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.eclipse.jetty.client.ContentExchange; import org.eclipse.jetty.client.HttpClient; public class RestTest extends JettyTestSupport { + private static final Log LOG = LogFactory.getLog(RestTest.class); public void testConsume() throws Exception { producer.send(session.createTextMessage("test")); + LOG.info("message sent"); HttpClient httpClient = new HttpClient(); httpClient.start(); @@ -17,18 +21,36 @@ public class RestTest extends JettyTestSupport { contentExchange.setURL("http://localhost:8080/message/test?readTimeout=1000&type=queue"); httpClient.send(contentExchange); contentExchange.waitForDone(); - assertEquals("test", contentExchange.getResponseContent()); - + assertEquals("test", contentExchange.getResponseContent()); + } + + public void testSubscribeFirst() throws Exception { + HttpClient httpClient = new HttpClient(); + httpClient.start(); + ContentExchange contentExchange = new ContentExchange(); + httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL); + contentExchange.setURL("http://localhost:8080/message/test?readTimeout=5000&type=queue"); + httpClient.send(contentExchange); + + Thread.sleep(1000); + + producer.send(session.createTextMessage("test")); + LOG.info("message sent"); + + contentExchange.waitForDone(); + assertEquals("test", contentExchange.getResponseContent()); } public void testSelector() throws Exception { TextMessage msg1 = session.createTextMessage("test1"); msg1.setIntProperty("test", 1); producer.send(msg1); + LOG.info("message 1 sent"); TextMessage msg2 = session.createTextMessage("test2"); msg2.setIntProperty("test", 2); producer.send(msg2); + LOG.info("message 2 sent"); HttpClient httpClient = new HttpClient(); httpClient.start(); diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java index 3c364c425b..f0255d8bf4 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java @@ -162,111 +162,125 @@ public class MessageServlet extends MessageServletSupport { * @throws IOException */ protected void doMessages(HttpServletRequest request, HttpServletResponse response, int maxMessages) throws ServletException, IOException { - - int messages = 0; try { WebClient client = getWebClient(request); Destination destination = getDestination(client, request); if (destination == null) { throw new NoDestinationSuppliedException(); } - long timeout = getReadTimeout(request); - boolean ajax = isRicoAjax(request); - if (!ajax) { - maxMessages = 1; + MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName)); + Message message = null; + message = (Message)request.getAttribute("message"); + if (message != null) { + // we're resuming continuation, + // so just write the message and return + writeResponse(request, response, maxMessages, message, consumer); + return; } + long timeout = getReadTimeout(request); if (LOG.isDebugEnabled()) { LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout); } - MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName)); Continuation continuation = null; Listener listener = null; - Message message = null; + - synchronized (consumer) { + // Look for any available messages + message = consumer.receive(10); + + // Get an existing Continuation or create a new one if there are + // no events. + if (message == null) { + continuation = ContinuationSupport.getContinuation(request); + + if (continuation.isExpired()) { + response.setStatus(isRicoAjax(request) ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT); + return; + } + + continuation.setTimeout(timeout); + continuation.suspend(); + // Fetch the listeners listener = (Listener)consumer.getAvailableListener(); if (listener == null) { listener = new Listener(consumer); consumer.setAvailableListener(listener); } - // Look for any available messages - message = consumer.receiveNoWait(); - // Get an existing Continuation or create a new one if there are - // no events. - if (message == null) { - continuation = ContinuationSupport.getContinuation(request); - - // register this continuation with our listener. - listener.setContinuation(continuation); - - // Get the continuation object (may wait and/or retry - // request here). - continuation.setTimeout(timeout); - continuation.suspend(); - } - - // Try again now - if (message == null) { - message = consumer.receiveNoWait(); - } - - // write a responds - response.setContentType("text/xml"); - PrintWriter writer = response.getWriter(); - - if (ajax) { - writer.println(""); - } - - // handle any message(s) - if (message == null) { - // No messages so OK response of for ajax else no content. - response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT); - } else { - // We have at least one message so set up the response - response.setStatus(HttpServletResponse.SC_OK); - String type = getContentType(request); - if (type != null) { - response.setContentType(type); - } - - // send a response for each available message (up to max - // messages) - while ((maxMessages < 0 || messages < maxMessages) && message != null) { - if (ajax) { - writer.print(""); - } else { - // only ever 1 message for non ajax! - setResponseHeaders(response, message); - } - - writeMessageResponse(writer, message); - - if (ajax) { - writer.println(""); - } - - // look for next message - messages++; - if(maxMessages < 0 || messages < maxMessages) { - message = consumer.receiveNoWait(); - } - } - } - - if (ajax) { - writer.println(""); - writer.println(""); - } + // register this continuation with our listener. + listener.setContinuation(continuation); } + + writeResponse(request, response, maxMessages, message, consumer); } catch (JMSException e) { throw new ServletException("Could not post JMS message: " + e, e); + } + } + + protected void writeResponse(HttpServletRequest request, HttpServletResponse response, int maxMessages, Message message, MessageAvailableConsumer consumer) throws IOException, JMSException { + int messages = 0; + try { + boolean ajax = isRicoAjax(request); + if (!ajax) { + maxMessages = 1; + } + + // write a responds + response.setContentType("text/xml"); + PrintWriter writer = response.getWriter(); + + if (ajax) { + writer.println(""); + } + + // handle any message(s) + if (message == null) { + // No messages so OK response of for ajax else no content. + response.setStatus(ajax ? HttpServletResponse.SC_OK + : HttpServletResponse.SC_NO_CONTENT); + } else { + // We have at least one message so set up the response + response.setStatus(HttpServletResponse.SC_OK); + String type = getContentType(request); + if (type != null) { + response.setContentType(type); + } + + // send a response for each available message (up to max + // messages) + while ((maxMessages < 0 || messages < maxMessages) + && message != null) { + if (ajax) { + writer.print(""); + } else { + // only ever 1 message for non ajax! + setResponseHeaders(response, message); + } + + writeMessageResponse(writer, message); + + if (ajax) { + writer.println(""); + } + + // look for next message + messages++; + if (maxMessages < 0 || messages < maxMessages) { + message = consumer.receiveNoWait(); + } + } + } + + if (ajax) { + writer + .println(""); + writer.println(""); + } } finally { if (LOG.isDebugEnabled()) { LOG.debug("Received " + messages + " message(s)"); @@ -475,9 +489,14 @@ public class MessageServlet extends MessageServletSupport { synchronized (this.consumer) { if (continuation != null) { - continuation.resume(); + try { + Message message = consumer.receiveNoWait(); + continuation.setAttribute("message", message); + } catch (Exception e) { + e.printStackTrace(); + } + continuation.resume(); } - continuation = null; } } }