diff --git a/activemq-web-demo/pom.xml b/activemq-web-demo/pom.xml index ad879af7ac..ab161879ed 100755 --- a/activemq-web-demo/pom.xml +++ b/activemq-web-demo/pom.xml @@ -131,6 +131,14 @@ org.apache.derby derby + + + commons-httpclient + commons-httpclient + 3.1 + test + + diff --git a/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java b/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java new file mode 100644 index 0000000000..4cb7dfcd8d --- /dev/null +++ b/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java @@ -0,0 +1,55 @@ +package org.apache.activemq.web; + + +import java.util.Enumeration; +import javax.jms.TextMessage; +import javax.jms.MessageProducer; +import javax.management.ObjectName; + +import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.broker.jmx.SubscriptionViewMBean; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.httpclient.*; +import org.apache.commons.httpclient.methods.*; +import java.util.Set; + +public class AjaxTest extends JettyTestSupport { + private static final Log LOG = LogFactory.getLog(AjaxTest.class); + + private String expectedResponse = "\n" + + "test one\n" + + "test two\n" + + "test three\n" + + ""; + + public void testReceiveMultipleMessagesFromQueue() throws Exception { + + MessageProducer local_producer = session.createProducer(session.createQueue("test")); + + HttpClient httpClient = new HttpClient(); + PostMethod post = new PostMethod( "http://localhost:8080/amq" ); + post.addParameter( "destination", "queue://test" ); + post.addParameter( "type", "listen" ); + post.addParameter( "message", "handler" ); + httpClient.executeMethod( post ); + + // send message + TextMessage msg1 = session.createTextMessage("test one"); + producer.send(msg1); + TextMessage msg2 = session.createTextMessage("test two"); + producer.send(msg2); + TextMessage msg3 = session.createTextMessage("test three"); + producer.send(msg3); + + HttpMethod get = new GetMethod( "http://localhost:8080/amq?timeout=5000" ); + httpClient.executeMethod( get ); + byte[] responseBody = get.getResponseBody(); + String response = new String( responseBody ); + + LOG.info("Poll response: " + response); + assertEquals("Poll response not right", expectedResponse.trim(), response.trim()); + } + +} diff --git a/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java b/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java index b7b0133eff..aff812b8df 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java @@ -27,6 +27,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.activemq.MessageAvailableListener; +import java.util.LinkedList; + /* * Listen for available messages and wakeup any continuations. */ @@ -37,10 +39,12 @@ public class AjaxListener implements MessageAvailableListener { private AjaxWebClient client; private long lastAccess; private Continuation continuation; + private LinkedList unconsumedMessages = new LinkedList(); AjaxListener(AjaxWebClient client, long maximumReadTimeout) { this.client = client; this.maximumReadTimeout = maximumReadTimeout; + access(); } public void access() { @@ -51,9 +55,13 @@ public class AjaxListener implements MessageAvailableListener { this.continuation = continuation; } + public LinkedList getUnconsumedMessages() { + return unconsumedMessages; + } + public synchronized void onMessageAvailable(MessageConsumer consumer) { if (LOG.isDebugEnabled()) { - LOG.debug("message for " + consumer + "continuation=" + continuation); + LOG.debug("message for " + consumer + " continuation=" + continuation); } if (continuation != null) { try { @@ -70,6 +78,15 @@ public class AjaxListener implements MessageAvailableListener { client.closeConsumers(); }; }.start(); + } else { + try { + Message message = consumer.receive(10); + if (message != null) { + unconsumedMessages.addLast(message); + } + } catch (Exception e) { + LOG.error("Error receiving message " + e, e); + } } } } diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java index d5fdd93129..e338459be4 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java @@ -20,13 +20,7 @@ package org.apache.activemq.web; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Date; -import java.util.Iterator; -import java.util.Timer; -import java.util.TimerTask; +import java.util.*; import javax.jms.Destination; import javax.jms.JMSException; @@ -297,10 +291,10 @@ public class MessageListenerServlet extends MessageServletSupport { response.setContentType("text/xml"); response.setHeader("Cache-Control", "no-cache"); - if (message == null) { + if (message == null && client.getListener().getUnconsumedMessages().size() == 0) { Continuation continuation = ContinuationSupport.getContinuation(request); - if (continuation.isExpired()) { + if (continuation.isExpired()) { response.setStatus(HttpServletResponse.SC_OK); StringWriter swriter = new StringWriter(); PrintWriter writer = new PrintWriter(swriter); @@ -319,6 +313,7 @@ public class MessageListenerServlet extends MessageServletSupport { // Fetch the listeners AjaxListener listener = client.getListener(); + listener.access(); // register this continuation with our listener. listener.setContinuation(continuation); @@ -350,6 +345,18 @@ public class MessageListenerServlet extends MessageServletSupport { continue; } + LinkedList unconsumedMessages = ((AjaxListener)consumer.getAvailableListener()).getUnconsumedMessages(); + LOG.debug("Send " + unconsumedMessages.size() + " unconsumed messages"); + for (Message msg : unconsumedMessages) { + messages++; + String id = consumerIdMap.get(consumer); + String destinationName = consumerDestinationNameMap.get(consumer); + writeMessageResponse(writer, msg, id, destinationName); + if (messages >= maximumMessages) { + break; + } + } + // Look for any available messages while (messages < maximumMessages) { message = consumer.receiveNoWait();