https://issues.apache.org/jira/browse/AMQ-3123 - messages for web clients need to be associated with the correct consumer.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1072620 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alex Dean 2011-02-20 16:09:28 +00:00
parent 8649f65c54
commit c6ede162c3
4 changed files with 156 additions and 34 deletions

View File

@ -463,4 +463,75 @@ public class AjaxTest extends JettyTestSupport {
assertEquals( "Poll response is not correct.", expected, poll.getResponseContent() ); assertEquals( "Poll response is not correct.", expected, poll.getResponseContent() );
} }
public void testAjaxClientReceivesMessagesForMultipleTopics() throws Exception {
LOG.debug( "*** testAjaxClientReceivesMessagesForMultipleTopics ***" );
HttpClient httpClient = new HttpClient();
httpClient.start();
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
LOG.debug( "SENDING LISTEN FOR /topic/topicA" );
AjaxTestContentExchange contentExchange = new AjaxTestContentExchange();
contentExchange.setMethod( "POST" );
contentExchange.setURL("http://localhost:8080/amq");
contentExchange.setRequestContent( new ByteArrayBuffer("destination=topic://topicA&type=listen&message=handlerA") );
contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" );
httpClient.send(contentExchange);
contentExchange.waitForDone();
String jsessionid = contentExchange.getJsessionId();
LOG.debug( "SENDING LISTEN FOR /topic/topicB" );
contentExchange = new AjaxTestContentExchange();
contentExchange.setMethod( "POST" );
contentExchange.setURL("http://localhost:8080/amq");
contentExchange.setRequestContent( new ByteArrayBuffer("destination=topic://topicB&type=listen&message=handlerB") );
contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" );
contentExchange.setRequestHeader( "Cookie", jsessionid );
httpClient.send(contentExchange);
contentExchange.waitForDone();
// client 1 polls for messages
LOG.debug( "SENDING POLL" );
AjaxTestContentExchange poll = new AjaxTestContentExchange();
poll.setMethod( "GET" );
poll.setURL("http://localhost:8080/amq?timeout=5000");
poll.setRequestHeader( "Cookie", jsessionid );
httpClient.send( poll );
// while client 1 is polling, client 2 sends messages to the topics
LOG.debug( "SENDING MESSAGES" );
contentExchange = new AjaxTestContentExchange();
contentExchange.setMethod( "POST" );
contentExchange.setURL("http://localhost:8080/amq");
contentExchange.setRequestContent( new ByteArrayBuffer(
"destination=topic://topicA&type=send&message=A1&"+
"d1=topic://topicB&t1=send&m1=B1&"+
"d2=topic://topicA&t2=send&m2=A2&"+
"d3=topic://topicB&t3=send&m3=B2"
) );
contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" );
httpClient.send(contentExchange);
contentExchange.waitForDone();
LOG.debug( "DONE POSTING MESSAGES" );
// wait for poll to finish
poll.waitForDone();
String response = poll.getResponseContent();
// not all messages might be delivered during the 1st poll. We need to check again.
poll = new AjaxTestContentExchange();
poll.setMethod( "GET" );
poll.setURL("http://localhost:8080/amq?timeout=5000");
poll.setRequestHeader( "Cookie", jsessionid );
httpClient.send( poll );
poll.waitForDone();
String fullResponse = response + poll.getResponseContent();
LOG.debug( "full response " + fullResponse );
assertContains( "<response id='handlerA' destination='topic://topicA' >A1</response>\n", fullResponse );
assertContains( "<response id='handlerB' destination='topic://topicB' >B1</response>\n", fullResponse );
assertContains( "<response id='handlerA' destination='topic://topicA' >A2</response>\n", fullResponse );
assertContains( "<response id='handlerB' destination='topic://topicB' >B2</response>\n", fullResponse );
assertResponseCount( 4, fullResponse );
}
} }

View File

@ -39,7 +39,7 @@ public class AjaxListener implements MessageAvailableListener {
private AjaxWebClient client; private AjaxWebClient client;
private long lastAccess; private long lastAccess;
private Continuation continuation; private Continuation continuation;
private LinkedList<Message> unconsumedMessages = new LinkedList<Message>(); private LinkedList<UndeliveredAjaxMessage> undeliveredMessages = new LinkedList<UndeliveredAjaxMessage>();
AjaxListener(AjaxWebClient client, long maximumReadTimeout) { AjaxListener(AjaxWebClient client, long maximumReadTimeout) {
this.client = client; this.client = client;
@ -55,8 +55,8 @@ public class AjaxListener implements MessageAvailableListener {
this.continuation = continuation; this.continuation = continuation;
} }
public LinkedList<Message> getUnconsumedMessages() { public LinkedList<UndeliveredAjaxMessage> getUndeliveredMessages() {
return unconsumedMessages; return undeliveredMessages;
} }
public synchronized void onMessageAvailable(MessageConsumer consumer) { public synchronized void onMessageAvailable(MessageConsumer consumer) {
@ -70,12 +70,11 @@ public class AjaxListener implements MessageAvailableListener {
if( message != null ) { if( message != null ) {
if( continuation.isSuspended() ) { if( continuation.isSuspended() ) {
LOG.debug( "Resuming suspended continuation " + continuation ); LOG.debug( "Resuming suspended continuation " + continuation );
continuation.setAttribute("message", message); continuation.setAttribute("undelivered_message", new UndeliveredAjaxMessage( message, consumer ) );
continuation.setAttribute("consumer", consumer);
continuation.resume(); continuation.resume();
} else { } else {
LOG.debug( "Message available, but continuation is already resumed. Buffer for next time." ); LOG.debug( "Message available, but continuation is already resumed. Buffer for next time." );
bufferMessageForDelivery( message ); bufferMessageForDelivery( message, consumer );
} }
} }
} catch (Exception e) { } catch (Exception e) {
@ -91,17 +90,17 @@ public class AjaxListener implements MessageAvailableListener {
} else { } else {
try { try {
Message message = consumer.receive(10); Message message = consumer.receive(10);
bufferMessageForDelivery( message ); bufferMessageForDelivery( message, consumer );
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error receiving message " + e, e); LOG.error("Error receiving message " + e, e);
} }
} }
} }
public void bufferMessageForDelivery( Message message ) { public void bufferMessageForDelivery( Message message, MessageConsumer consumer ) {
if( message != null ) { if( message != null ) {
synchronized( unconsumedMessages ) { synchronized( undeliveredMessages ) {
unconsumedMessages.addLast(message); undeliveredMessages.addLast( new UndeliveredAjaxMessage( message, consumer ) );
} }
} }
} }

View File

@ -263,15 +263,22 @@ public class MessageListenerServlet extends MessageServletSupport {
LOG.debug("doMessage timeout=" + timeout); LOG.debug("doMessage timeout=" + timeout);
} }
Message message = null;
// this is non-null if we're resuming the continuation. // this is non-null if we're resuming the continuation.
// attributes set in AjaxListener // attributes set in AjaxListener
message = (Message)request.getAttribute("message"); UndeliveredAjaxMessage undelivered_message = null;
Message message = null;
undelivered_message = (UndeliveredAjaxMessage)request.getAttribute("undelivered_message");
if( undelivered_message != null ) {
message = (Message)undelivered_message.getMessage();
}
synchronized (client) { synchronized (client) {
List consumers = client.getConsumers(); List consumers = client.getConsumers();
MessageAvailableConsumer consumer = (MessageAvailableConsumer)request.getAttribute("consumer"); MessageAvailableConsumer consumer = null;
if( undelivered_message != null ) {
consumer = (MessageAvailableConsumer)undelivered_message.getConsumer();
}
if (message == null) { if (message == null) {
// Look for a message that is ready to go // Look for a message that is ready to go
@ -293,7 +300,7 @@ public class MessageListenerServlet extends MessageServletSupport {
response.setContentType("text/xml"); response.setContentType("text/xml");
response.setHeader("Cache-Control", "no-cache"); response.setHeader("Cache-Control", "no-cache");
if (message == null && client.getListener().getUnconsumedMessages().size() == 0) { if (message == null && client.getListener().getUndeliveredMessages().size() == 0) {
Continuation continuation = ContinuationSupport.getContinuation(request); Continuation continuation = ContinuationSupport.getContinuation(request);
if (continuation.isExpired()) { if (continuation.isExpired()) {
@ -336,11 +343,33 @@ public class MessageListenerServlet extends MessageServletSupport {
if (message != null) { if (message != null) {
String id = consumerIdMap.get(consumer); String id = consumerIdMap.get(consumer);
String destinationName = consumerDestinationNameMap.get(consumer); String destinationName = consumerDestinationNameMap.get(consumer);
LOG.debug( "sending pre-existing message" );
writeMessageResponse(writer, message, id, destinationName); writeMessageResponse(writer, message, id, destinationName);
messages++; messages++;
} }
// send messages buffered while continuation was unavailable.
LinkedList<UndeliveredAjaxMessage> undeliveredMessages = ((AjaxListener)consumer.getAvailableListener()).getUndeliveredMessages();
LOG.debug("Send " + undeliveredMessages.size() + " unconsumed messages");
synchronized( undeliveredMessages ) {
for (Iterator<UndeliveredAjaxMessage> it = undeliveredMessages.iterator(); it.hasNext(); ) {
messages++;
UndeliveredAjaxMessage undelivered = it.next();
Message msg = (Message)undelivered.getMessage();
consumer = (MessageAvailableConsumer)undelivered.getConsumer();
String id = consumerIdMap.get(consumer);
String destinationName = consumerDestinationNameMap.get(consumer);
LOG.debug( "sending undelivered/buffered messages" );
LOG.debug( "msg:" +msg+ ", id:" +id+ ", destinationName:" +destinationName);
writeMessageResponse(writer, msg, id, destinationName);
it.remove();
if (messages >= maximumMessages) {
break;
}
}
}
// Send the rest of the messages // Send the rest of the messages
for (int i = 0; i < consumers.size() && messages < maximumMessages; i++) { for (int i = 0; i < consumers.size() && messages < maximumMessages; i++) {
consumer = (MessageAvailableConsumer)consumers.get(i); consumer = (MessageAvailableConsumer)consumers.get(i);
@ -348,22 +377,6 @@ public class MessageListenerServlet extends MessageServletSupport {
continue; continue;
} }
LinkedList<Message> unconsumedMessages = ((AjaxListener)consumer.getAvailableListener()).getUnconsumedMessages();
LOG.debug("Send " + unconsumedMessages.size() + " unconsumed messages");
synchronized( unconsumedMessages ) {
for (Iterator<Message> it = unconsumedMessages.iterator(); it.hasNext(); ) {
messages++;
Message msg = it.next();
String id = consumerIdMap.get(consumer);
String destinationName = consumerDestinationNameMap.get(consumer);
writeMessageResponse(writer, msg, id, destinationName);
it.remove();
if (messages >= maximumMessages) {
break;
}
}
}
// Look for any available messages // Look for any available messages
while (messages < maximumMessages) { while (messages < maximumMessages) {
message = consumer.receiveNoWait(); message = consumer.receiveNoWait();
@ -373,6 +386,7 @@ public class MessageListenerServlet extends MessageServletSupport {
messages++; messages++;
String id = consumerIdMap.get(consumer); String id = consumerIdMap.get(consumer);
String destinationName = consumerDestinationNameMap.get(consumer); String destinationName = consumerDestinationNameMap.get(consumer);
LOG.debug( "sending final available messages" );
writeMessageResponse(writer, message, id, destinationName); writeMessageResponse(writer, message, id, destinationName);
} }
} }

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web;
import javax.jms.Message;
import javax.jms.MessageConsumer;
class UndeliveredAjaxMessage {
private Message message;
private MessageConsumer consumer;
UndeliveredAjaxMessage( Message message, MessageConsumer consumer ) {
this.message = message;
this.consumer = consumer;
}
public MessageConsumer getConsumer() {
return this.consumer;
}
public Message getMessage() {
return this.message;
}
}