From 88fc7fec5091ae36d5ceb05203577dd3875dee14 Mon Sep 17 00:00:00 2001 From: Alex Dean Date: Fri, 28 Jan 2011 15:21:02 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3094 - ajax does not receive all messages * ensure delivery of messages received by AjaxListener when a continuation is not available for resumption. * add test coverage for several common uses of AjaxServlet * switch back to jetty httpclient for better processing of asynchronous HTTP git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1064725 13f79535-47bb-0310-9956-ffa450edef68 --- activemq-web-demo/pom.xml | 7 - .../org/apache/activemq/web/AjaxTest.java | 475 ++++++++++++++++-- .../apache/activemq/web/JettyTestSupport.java | 1 + .../org/apache/activemq/web/AjaxListener.java | 28 +- .../activemq/web/MessageListenerServlet.java | 21 +- 5 files changed, 474 insertions(+), 58 deletions(-) diff --git a/activemq-web-demo/pom.xml b/activemq-web-demo/pom.xml index ab161879ed..a6681e77e6 100755 --- a/activemq-web-demo/pom.xml +++ b/activemq-web-demo/pom.xml @@ -132,13 +132,6 @@ derby - - commons-httpclient - commons-httpclient - 3.1 - test - - diff --git a/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java b/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java index 7760e0cac1..3024ffeff9 100644 --- a/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java +++ b/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java @@ -16,52 +16,451 @@ */ package org.apache.activemq.web; +import org.apache.activemq.transport.stomp.StompConnection; +import org.apache.activemq.transport.stomp.StompFrame; +import org.apache.activemq.transport.stomp.Stomp; + +import java.lang.Thread; +import java.net.SocketTimeoutException; -import org.apache.commons.httpclient.HttpClient; -import org.apache.commons.httpclient.HttpMethod; -import org.apache.commons.httpclient.methods.GetMethod; -import org.apache.commons.httpclient.methods.PostMethod; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.lang.StringUtils; + +import java.util.*; +import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.client.ContentExchange; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.io.ByteArrayBuffer; import javax.jms.MessageProducer; +import javax.jms.Message; import javax.jms.TextMessage; public class AjaxTest extends JettyTestSupport { private static final Log LOG = LogFactory.getLog(AjaxTest.class); - - private String expectedResponse = "\n" + - "test one\n" + - "test two\n" + - "test three\n" + - ""; - - public void testReceiveMultipleMessagesFromQueue() throws Exception { - - MessageProducer local_producer = session.createProducer(session.createQueue("test")); - - HttpClient httpClient = new HttpClient(); - PostMethod post = new PostMethod( "http://localhost:8080/amq" ); - post.addParameter( "destination", "queue://test" ); - post.addParameter( "type", "listen" ); - post.addParameter( "message", "handler" ); - httpClient.executeMethod( post ); - - // send message - TextMessage msg1 = session.createTextMessage("test one"); - producer.send(msg1); - TextMessage msg2 = session.createTextMessage("test two"); - producer.send(msg2); - TextMessage msg3 = session.createTextMessage("test three"); - producer.send(msg3); - - HttpMethod get = new GetMethod( "http://localhost:8080/amq?timeout=5000" ); - httpClient.executeMethod( get ); - byte[] responseBody = get.getResponseBody(); - String response = new String( responseBody ); - - LOG.info("Poll response: " + response); - assertEquals("Poll response not right", expectedResponse.trim(), response.trim()); + + private class AjaxTestContentExchange extends ContentExchange { + private HashMap headers; + private String responseContent; + + AjaxTestContentExchange() { + super(true); + this.headers = new HashMap(); + this.responseContent = ""; + } + protected void onResponseContent( Buffer content ) { + this.responseContent += content.toString(); + } + protected void onResponseHeader( Buffer name, Buffer value ) { + headers.put( name.toString(), value.toString() ); + } + public String getJsessionId() { + String cookie = headers.get( "Set-Cookie" ); + String[] cookie_parts = cookie.split( ";" ); + return cookie_parts[0]; + } + public String getResponseContent() { + return responseContent; + } } - + + public void assertContains( String expected, String actual ) { + assertTrue( "'"+actual+"' does not contain expected fragment '"+expected+"'", actual.indexOf( expected ) != -1 ); + } + public void assertResponseCount( int expected, String actual ) { + int occurrences = StringUtils.countMatches( actual, " elements is not correct.", expected, occurrences ); + } + + public void testAjaxClientReceivesMessagesWhichAreSentToQueueWhileClientIsPolling() throws Exception { + LOG.debug( "*** testAjaxClientReceivesMessagesWhichAreSentToQueueWhileClientIsPolling ***" ); + + HttpClient httpClient = new HttpClient(); + httpClient.start(); + httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL); + + // client 1 subscribes to a queue + LOG.debug( "SENDING LISTEN" ); + AjaxTestContentExchange contentExchange = new AjaxTestContentExchange(); + contentExchange.setMethod( "POST" ); + contentExchange.setURL("http://localhost:8080/amq"); + contentExchange.setRequestContent( new ByteArrayBuffer("destination=queue://test&type=listen&message=handler") ); + contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" ); + httpClient.send(contentExchange); + contentExchange.waitForDone(); + String jsessionid = contentExchange.getJsessionId(); + + // client 1 polls for messages + LOG.debug( "SENDING POLL" ); + AjaxTestContentExchange poll = new AjaxTestContentExchange(); + poll.setMethod( "GET" ); + poll.setURL("http://localhost:8080/amq?timeout=5000"); + poll.setRequestHeader( "Cookie", jsessionid ); + httpClient.send( poll ); + + // while client 1 is polling, client 2 sends messages to the queue + LOG.debug( "SENDING MESSAGES" ); + contentExchange = new AjaxTestContentExchange(); + contentExchange.setMethod( "POST" ); + contentExchange.setURL("http://localhost:8080/amq"); + contentExchange.setRequestContent( new ByteArrayBuffer( + "destination=queue://test&type=send&message=msg1&"+ + "d1=queue://test&t1=send&m1=msg2&"+ + "d2=queue://test&t2=send&m2=msg3" + ) ); + contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" ); + httpClient.send(contentExchange); + contentExchange.waitForDone(); + LOG.debug( "DONE POSTING MESSAGES" ); + + // wait for poll to finish + poll.waitForDone(); + String response = poll.getResponseContent(); + + // messages might not all be delivered during the 1st poll. We need to check again. + poll = new AjaxTestContentExchange(); + poll.setMethod( "GET" ); + poll.setURL("http://localhost:8080/amq?timeout=5000"); + poll.setRequestHeader( "Cookie", jsessionid ); + httpClient.send( poll ); + poll.waitForDone(); + + String fullResponse = response + poll.getResponseContent(); + LOG.debug( "full response : " + fullResponse ); + + assertContains( "msg1\n", fullResponse ); + assertContains( "msg2\n", fullResponse ); + assertContains( "msg3\n", fullResponse ); + assertResponseCount( 3, fullResponse ); + } + + public void testAjaxClientReceivesMessagesWhichAreSentToTopicWhileClientIsPolling() throws Exception { + LOG.debug( "*** testAjaxClientReceivesMessagesWhichAreSentToTopicWhileClientIsPolling ***" ); + + HttpClient httpClient = new HttpClient(); + httpClient.start(); + httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL); + + // client 1 subscribes to a queue + LOG.debug( "SENDING LISTEN" ); + AjaxTestContentExchange contentExchange = new AjaxTestContentExchange(); + contentExchange.setMethod( "POST" ); + contentExchange.setURL("http://localhost:8080/amq"); + contentExchange.setRequestContent( new ByteArrayBuffer("destination=topic://test&type=listen&message=handler") ); + contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" ); + httpClient.send(contentExchange); + contentExchange.waitForDone(); + String jsessionid = contentExchange.getJsessionId(); + + // client 1 polls for messages + LOG.debug( "SENDING POLL" ); + AjaxTestContentExchange poll = new AjaxTestContentExchange(); + poll.setMethod( "GET" ); + poll.setURL("http://localhost:8080/amq?timeout=5000"); + poll.setRequestHeader( "Cookie", jsessionid ); + httpClient.send( poll ); + + // while client 1 is polling, client 2 sends messages to the queue + LOG.debug( "SENDING MESSAGES" ); + contentExchange = new AjaxTestContentExchange(); + contentExchange.setMethod( "POST" ); + contentExchange.setURL("http://localhost:8080/amq"); + contentExchange.setRequestContent( new ByteArrayBuffer( + "destination=topic://test&type=send&message=msg1&"+ + "d1=topic://test&t1=send&m1=msg2&"+ + "d2=topic://test&t2=send&m2=msg3" + ) ); + contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" ); + httpClient.send(contentExchange); + contentExchange.waitForDone(); + LOG.debug( "DONE POSTING MESSAGES" ); + + // wait for poll to finish + poll.waitForDone(); + String response = poll.getResponseContent(); + + // not all messages might be delivered during the 1st poll. We need to check again. + poll = new AjaxTestContentExchange(); + poll.setMethod( "GET" ); + poll.setURL("http://localhost:8080/amq?timeout=5000"); + poll.setRequestHeader( "Cookie", jsessionid ); + httpClient.send( poll ); + poll.waitForDone(); + + String fullResponse = response + poll.getResponseContent(); + LOG.debug( "full response : " + fullResponse ); + + assertContains( "msg1\n", fullResponse ); + assertContains( "msg2\n", fullResponse ); + assertContains( "msg3\n", fullResponse ); + assertResponseCount( 3, fullResponse ); + } + + public void testAjaxClientReceivesMessagesWhichAreQueuedBeforeClientSubscribes() throws Exception { + LOG.debug( "*** testAjaxClientReceivesMessagesWhichAreQueuedBeforeClientSubscribes ***" ); + // send messages to queue://test + producer.send( session.createTextMessage("test one") ); + producer.send( session.createTextMessage("test two") ); + producer.send( session.createTextMessage("test three") ); + + HttpClient httpClient = new HttpClient(); + httpClient.start(); + httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL); + + // client 1 subscribes to queue + LOG.debug( "SENDING LISTEN" ); + AjaxTestContentExchange contentExchange = new AjaxTestContentExchange(); + contentExchange.setMethod( "POST" ); + contentExchange.setURL("http://localhost:8080/amq"); + contentExchange.setRequestContent( new ByteArrayBuffer("destination=queue://test&type=listen&message=handler") ); + contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" ); + httpClient.send(contentExchange); + contentExchange.waitForDone(); + String jsessionid = contentExchange.getJsessionId(); + + // client 1 polls for messages + LOG.debug( "SENDING POLL" ); + AjaxTestContentExchange poll = new AjaxTestContentExchange(); + poll.setMethod( "GET" ); + poll.setURL("http://localhost:8080/amq?timeout=5000"); + poll.setRequestHeader( "Cookie", jsessionid ); + httpClient.send( poll ); + + // wait for poll to finish + poll.waitForDone(); + String response = poll.getResponseContent(); + + assertContains( "test one\n", response ); + assertContains( "test two\n", response ); + assertContains( "test three\n", response ); + assertResponseCount( 3, response ); + } + + public void testStompMessagesAreReceivedByAjaxClient() throws Exception { + LOG.debug( "*** testStompMessagesAreRecievedByAjaxClient ***" ); + + HttpClient httpClient = new HttpClient(); + httpClient.start(); + httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL); + + // client 1 subscribes to a queue + LOG.debug( "SENDING LISTEN" ); + AjaxTestContentExchange contentExchange = new AjaxTestContentExchange(); + contentExchange.setMethod( "POST" ); + contentExchange.setURL("http://localhost:8080/amq"); + contentExchange.setRequestContent( new ByteArrayBuffer("destination=queue://test&type=listen&message=handler") ); + contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" ); + httpClient.send(contentExchange); + contentExchange.waitForDone(); + String jsessionid = contentExchange.getJsessionId(); + + // client 1 polls for messages + LOG.debug( "SENDING POLL" ); + AjaxTestContentExchange poll = new AjaxTestContentExchange(); + poll.setMethod( "GET" ); + poll.setURL("http://localhost:8080/amq?timeout=5000"); + poll.setRequestHeader( "Cookie", jsessionid ); + httpClient.send( poll ); + + // stomp client queues some messages + StompConnection connection = new StompConnection(); + connection.open("localhost", 61613); + connection.connect("user", "password"); + HashMap headers = new HashMap(); + headers.put( "amq-msg-type", "text" ); + connection.send( "/queue/test", "message1", (String)null, headers ); + connection.send( "/queue/test", "message2", (String)null, headers ); + connection.send( "/queue/test", "message3", (String)null, headers ); + connection.send( "/queue/test", "message4", (String)null, headers ); + connection.send( "/queue/test", "message5", (String)null, headers ); + connection.disconnect(); + + // wait for poll to finish + poll.waitForDone(); + String response = poll.getResponseContent(); + + // not all messages might be delivered during the 1st poll. We need to check again. + poll = new AjaxTestContentExchange(); + poll.setMethod( "GET" ); + poll.setURL("http://localhost:8080/amq?timeout=5000"); + poll.setRequestHeader( "Cookie", jsessionid ); + httpClient.send( poll ); + poll.waitForDone(); + + String fullResponse = response + poll.getResponseContent(); + + assertContains( "message1\n", fullResponse ); + assertContains( "message2\n", fullResponse ); + assertContains( "message3\n", fullResponse ); + assertContains( "message4\n", fullResponse ); + assertContains( "message5\n", fullResponse ); + assertResponseCount( 5, fullResponse ); + } + + public void testAjaxMessagesAreReceivedByStompClient() throws Exception { + LOG.debug( "*** testAjaxMessagesAreReceivedByStompClient ***" ); + + HttpClient httpClient = new HttpClient(); + httpClient.start(); + httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL); + + AjaxTestContentExchange contentExchange = new AjaxTestContentExchange(); + contentExchange.setMethod( "POST" ); + contentExchange.setURL("http://localhost:8080/amq"); + contentExchange.setRequestContent( new ByteArrayBuffer( + "destination=queue://test&type=send&message=msg1&"+ + "d1=queue://test&t1=send&m1=msg2&"+ + "d2=queue://test&t2=send&m2=msg3&"+ + "d3=queue://test&t3=send&m3=msg4") ); + contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" ); + httpClient.send(contentExchange); + contentExchange.waitForDone(); + + StompConnection connection = new StompConnection(); + connection.open("localhost", 61613); + connection.connect("user", "password"); + connection.subscribe( "/queue/test" ); + + StompFrame message; + String allMessageBodies = ""; + try { + while( true ) { + message = connection.receive(5000); + allMessageBodies = allMessageBodies +"\n"+ message.getBody(); + } + } catch (SocketTimeoutException e) {} + + LOG.debug( "All message bodies : " + allMessageBodies ); + + assertContains( "msg1", allMessageBodies ); + assertContains( "msg2", allMessageBodies ); + assertContains( "msg3", allMessageBodies ); + assertContains( "msg4", allMessageBodies ); + } + + public void testAjaxClientMayUseSelectors() throws Exception { + LOG.debug( "*** testAjaxClientMayUseSelectors ***" ); + + // send 2 messages to the same queue w/ different 'filter' values. + Message msg = session.createTextMessage("test one"); + msg.setStringProperty( "filter", "one" ); + producer.send( msg ); + msg = session.createTextMessage("test two"); + msg.setStringProperty( "filter", "two" ); + producer.send( msg ); + + HttpClient httpClient = new HttpClient(); + httpClient.start(); + httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL); + + // client ubscribes to queue + LOG.debug( "SENDING LISTEN" ); + AjaxTestContentExchange contentExchange = new AjaxTestContentExchange(); + contentExchange.setMethod( "POST" ); + contentExchange.setURL("http://localhost:8080/amq"); + contentExchange.setRequestContent( new ByteArrayBuffer("destination=queue://test&type=listen&message=handler") ); + contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" ); + // SELECTOR + contentExchange.setRequestHeader( "selector", "filter='two'" ); + httpClient.send(contentExchange); + contentExchange.waitForDone(); + String jsessionid = contentExchange.getJsessionId(); + + // client 1 polls for messages + LOG.debug( "SENDING POLL" ); + AjaxTestContentExchange poll = new AjaxTestContentExchange(); + poll.setMethod( "GET" ); + poll.setURL("http://localhost:8080/amq?timeout=5000"); + poll.setRequestHeader( "Cookie", jsessionid ); + httpClient.send( poll ); + poll.waitForDone(); + + LOG.debug( poll.getResponseContent() ); + + String expected = "\n" + + "test two\n" + + "\n"; + assertEquals( "Poll response is not correct.", expected, poll.getResponseContent() ); + + } + + public void testMultipleAjaxClientsMayExistInTheSameSession() throws Exception { + LOG.debug( "*** testMultipleAjaxClientsMayExistInTheSameSession ***" ); + + // send messages to queues testA and testB. + MessageProducer producerA = session.createProducer(session.createQueue("testA")); + MessageProducer producerB = session.createProducer(session.createQueue("testB")); + producerA.send( session.createTextMessage("A1") ); + producerA.send( session.createTextMessage("A2") ); + producerB.send( session.createTextMessage("B1") ); + producerB.send( session.createTextMessage("B2") ); + + HttpClient httpClient = new HttpClient(); + httpClient.start(); + httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL); + + // clientA subscribes to /queue/testA + LOG.debug( "SENDING LISTEN" ); + AjaxTestContentExchange contentExchange = new AjaxTestContentExchange(); + contentExchange.setMethod( "POST" ); + contentExchange.setURL("http://localhost:8080/amq"); + contentExchange.setRequestContent( new ByteArrayBuffer( + "destination=queue://testA&"+ + "type=listen&"+ + "message=handlerA&"+ + "clientId=clientA" + ) ); + contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" ); + httpClient.send(contentExchange); + contentExchange.waitForDone(); + String jsessionid = contentExchange.getJsessionId(); + + // clientB subscribes to /queue/testB using the same JSESSIONID. + contentExchange = new AjaxTestContentExchange(); + contentExchange.setMethod( "POST" ); + contentExchange.setURL("http://localhost:8080/amq"); + contentExchange.setRequestHeader( "Cookie", jsessionid ); + contentExchange.setRequestContent( new ByteArrayBuffer( + "destination=queue://testB&"+ + "type=listen&"+ + "message=handlerB&"+ + "clientId=clientB" + ) ); + contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" ); + httpClient.send(contentExchange); + contentExchange.waitForDone(); + + // clientA polls for messages + AjaxTestContentExchange poll = new AjaxTestContentExchange(); + poll.setMethod( "GET" ); + poll.setURL("http://localhost:8080/amq?timeout=5000&clientId=clientA"); + poll.setRequestHeader( "Cookie", jsessionid ); + httpClient.send( poll ); + poll.waitForDone(); + + LOG.debug( "clientA response : " + poll.getResponseContent() ); + String expected = "\n" + + "A1\n" + + "A2\n" + + "\n"; + assertEquals( "Poll response is not correct.", expected, poll.getResponseContent() ); + + // clientB polls for messages + poll = new AjaxTestContentExchange(); + poll.setMethod( "GET" ); + poll.setURL("http://localhost:8080/amq?timeout=5000&clientId=clientB"); + poll.setRequestHeader( "Cookie", jsessionid ); + httpClient.send( poll ); + poll.waitForDone(); + + LOG.debug( "clientB response : " + poll.getResponseContent() ); + expected = "\n" + + "B1\n" + + "B2\n" + + "\n"; + assertEquals( "Poll response is not correct.", expected, poll.getResponseContent() ); + } + } diff --git a/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java b/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java index a833e367a2..bc38568181 100644 --- a/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java +++ b/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java @@ -51,6 +51,7 @@ public class JettyTestSupport extends TestCase { broker.setPersistent(false); broker.setUseJmx(true); broker.addConnector("tcp://localhost:61616"); + broker.addConnector("stomp://localhost:61613"); broker.start(); broker.waitUntilStarted(); diff --git a/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java b/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java index aff812b8df..597b5f82c3 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java @@ -66,12 +66,22 @@ public class AjaxListener implements MessageAvailableListener { if (continuation != null) { try { Message message = consumer.receive(10); - continuation.setAttribute("message", message); - continuation.setAttribute("consumer", consumer); + LOG.debug( "message is " + message ); + if( message != null ) { + if( continuation.isSuspended() ) { + LOG.debug( "Resuming suspended continuation " + continuation ); + continuation.setAttribute("message", message); + continuation.setAttribute("consumer", consumer); + continuation.resume(); + } else { + LOG.debug( "Message available, but continuation is already resumed. Buffer for next time." ); + bufferMessageForDelivery( message ); + } + } } catch (Exception e) { LOG.error("Error receiving message " + e, e); } - continuation.resume(); + } else if (System.currentTimeMillis() - lastAccess > 2 * this.maximumReadTimeout) { new Thread() { public void run() { @@ -81,12 +91,18 @@ public class AjaxListener implements MessageAvailableListener { } else { try { Message message = consumer.receive(10); - if (message != null) { - unconsumedMessages.addLast(message); - } + bufferMessageForDelivery( message ); } catch (Exception e) { LOG.error("Error receiving message " + e, e); } } } + + public void bufferMessageForDelivery( Message message ) { + if( message != null ) { + synchronized( unconsumedMessages ) { + unconsumedMessages.addLast(message); + } + } + } } diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java index e338459be4..1d65ae95cf 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java @@ -264,6 +264,8 @@ public class MessageListenerServlet extends MessageServletSupport { } Message message = null; + // this is non-null if we're resuming the continuation. + // attributes set in AjaxListener message = (Message)request.getAttribute("message"); synchronized (client) { @@ -310,6 +312,7 @@ public class MessageListenerServlet extends MessageServletSupport { continuation.setTimeout(timeout); continuation.suspend(); + LOG.debug( "Suspending continuation " + continuation ); // Fetch the listeners AjaxListener listener = client.getListener(); @@ -347,13 +350,17 @@ public class MessageListenerServlet extends MessageServletSupport { LinkedList unconsumedMessages = ((AjaxListener)consumer.getAvailableListener()).getUnconsumedMessages(); LOG.debug("Send " + unconsumedMessages.size() + " unconsumed messages"); - for (Message msg : unconsumedMessages) { - messages++; - String id = consumerIdMap.get(consumer); - String destinationName = consumerDestinationNameMap.get(consumer); - writeMessageResponse(writer, msg, id, destinationName); - if (messages >= maximumMessages) { - break; + synchronized( unconsumedMessages ) { + for (Iterator it = unconsumedMessages.iterator(); it.hasNext(); ) { + messages++; + Message msg = it.next(); + String id = consumerIdMap.get(consumer); + String destinationName = consumerDestinationNameMap.get(consumer); + writeMessageResponse(writer, msg, id, destinationName); + it.remove(); + if (messages >= maximumMessages) { + break; + } } }