git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1052259 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-12-23 13:17:12 +00:00
parent 10a403b6ab
commit bef96a9248
4 changed files with 97 additions and 10 deletions

View File

@ -131,6 +131,14 @@
<groupId>org.apache.derby</groupId> <groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId> <artifactId>derby</artifactId>
</dependency> </dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<properties> <properties>

View File

@ -0,0 +1,55 @@
package org.apache.activemq.web;
import java.util.Enumeration;
import javax.jms.TextMessage;
import javax.jms.MessageProducer;
import javax.management.ObjectName;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.httpclient.*;
import org.apache.commons.httpclient.methods.*;
import java.util.Set;
public class AjaxTest extends JettyTestSupport {
private static final Log LOG = LogFactory.getLog(AjaxTest.class);
private String expectedResponse = "<ajax-response>\n" +
"<response id='handler' destination='queue://test' >test one</response>\n" +
"<response id='handler' destination='queue://test' >test two</response>\n" +
"<response id='handler' destination='queue://test' >test three</response>\n" +
"</ajax-response>";
public void testReceiveMultipleMessagesFromQueue() throws Exception {
MessageProducer local_producer = session.createProducer(session.createQueue("test"));
HttpClient httpClient = new HttpClient();
PostMethod post = new PostMethod( "http://localhost:8080/amq" );
post.addParameter( "destination", "queue://test" );
post.addParameter( "type", "listen" );
post.addParameter( "message", "handler" );
httpClient.executeMethod( post );
// send message
TextMessage msg1 = session.createTextMessage("test one");
producer.send(msg1);
TextMessage msg2 = session.createTextMessage("test two");
producer.send(msg2);
TextMessage msg3 = session.createTextMessage("test three");
producer.send(msg3);
HttpMethod get = new GetMethod( "http://localhost:8080/amq?timeout=5000" );
httpClient.executeMethod( get );
byte[] responseBody = get.getResponseBody();
String response = new String( responseBody );
LOG.info("Poll response: " + response);
assertEquals("Poll response not right", expectedResponse.trim(), response.trim());
}
}

View File

@ -27,6 +27,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.activemq.MessageAvailableListener; import org.apache.activemq.MessageAvailableListener;
import java.util.LinkedList;
/* /*
* Listen for available messages and wakeup any continuations. * Listen for available messages and wakeup any continuations.
*/ */
@ -37,10 +39,12 @@ public class AjaxListener implements MessageAvailableListener {
private AjaxWebClient client; private AjaxWebClient client;
private long lastAccess; private long lastAccess;
private Continuation continuation; private Continuation continuation;
private LinkedList<Message> unconsumedMessages = new LinkedList<Message>();
AjaxListener(AjaxWebClient client, long maximumReadTimeout) { AjaxListener(AjaxWebClient client, long maximumReadTimeout) {
this.client = client; this.client = client;
this.maximumReadTimeout = maximumReadTimeout; this.maximumReadTimeout = maximumReadTimeout;
access();
} }
public void access() { public void access() {
@ -51,9 +55,13 @@ public class AjaxListener implements MessageAvailableListener {
this.continuation = continuation; this.continuation = continuation;
} }
public LinkedList<Message> getUnconsumedMessages() {
return unconsumedMessages;
}
public synchronized void onMessageAvailable(MessageConsumer consumer) { public synchronized void onMessageAvailable(MessageConsumer consumer) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("message for " + consumer + "continuation=" + continuation); LOG.debug("message for " + consumer + " continuation=" + continuation);
} }
if (continuation != null) { if (continuation != null) {
try { try {
@ -70,6 +78,15 @@ public class AjaxListener implements MessageAvailableListener {
client.closeConsumers(); client.closeConsumers();
}; };
}.start(); }.start();
} else {
try {
Message message = consumer.receive(10);
if (message != null) {
unconsumedMessages.addLast(message);
}
} catch (Exception e) {
LOG.error("Error receiving message " + e, e);
}
} }
} }
} }

View File

@ -20,13 +20,7 @@ package org.apache.activemq.web;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.HashMap; import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.Date;
import java.util.Iterator;
import java.util.Timer;
import java.util.TimerTask;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -297,7 +291,7 @@ public class MessageListenerServlet extends MessageServletSupport {
response.setContentType("text/xml"); response.setContentType("text/xml");
response.setHeader("Cache-Control", "no-cache"); response.setHeader("Cache-Control", "no-cache");
if (message == null) { if (message == null && client.getListener().getUnconsumedMessages().size() == 0) {
Continuation continuation = ContinuationSupport.getContinuation(request); Continuation continuation = ContinuationSupport.getContinuation(request);
if (continuation.isExpired()) { if (continuation.isExpired()) {
@ -319,6 +313,7 @@ public class MessageListenerServlet extends MessageServletSupport {
// Fetch the listeners // Fetch the listeners
AjaxListener listener = client.getListener(); AjaxListener listener = client.getListener();
listener.access();
// register this continuation with our listener. // register this continuation with our listener.
listener.setContinuation(continuation); listener.setContinuation(continuation);
@ -350,6 +345,18 @@ public class MessageListenerServlet extends MessageServletSupport {
continue; continue;
} }
LinkedList<Message> unconsumedMessages = ((AjaxListener)consumer.getAvailableListener()).getUnconsumedMessages();
LOG.debug("Send " + unconsumedMessages.size() + " unconsumed messages");
for (Message msg : unconsumedMessages) {
messages++;
String id = consumerIdMap.get(consumer);
String destinationName = consumerDestinationNameMap.get(consumer);
writeMessageResponse(writer, msg, id, destinationName);
if (messages >= maximumMessages) {
break;
}
}
// Look for any available messages // Look for any available messages
while (messages < maximumMessages) { while (messages < maximumMessages) {
message = consumer.receiveNoWait(); message = consumer.receiveNoWait();