mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3094 - ajax does not receive all messages
* ensure delivery of messages received by AjaxListener when a continuation is not available for resumption. * add test coverage for several common uses of AjaxServlet * switch back to jetty httpclient for better processing of asynchronous HTTP git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1064725 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
acc3d4f7cb
commit
88fc7fec50
|
@ -132,13 +132,6 @@
|
||||||
<artifactId>derby</artifactId>
|
<artifactId>derby</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>commons-httpclient</groupId>
|
|
||||||
<artifactId>commons-httpclient</artifactId>
|
|
||||||
<version>3.1</version>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
|
|
|
@ -16,52 +16,451 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.web;
|
package org.apache.activemq.web;
|
||||||
|
|
||||||
|
import org.apache.activemq.transport.stomp.StompConnection;
|
||||||
|
import org.apache.activemq.transport.stomp.StompFrame;
|
||||||
|
import org.apache.activemq.transport.stomp.Stomp;
|
||||||
|
|
||||||
|
import java.lang.Thread;
|
||||||
|
import java.net.SocketTimeoutException;
|
||||||
|
|
||||||
import org.apache.commons.httpclient.HttpClient;
|
|
||||||
import org.apache.commons.httpclient.HttpMethod;
|
|
||||||
import org.apache.commons.httpclient.methods.GetMethod;
|
|
||||||
import org.apache.commons.httpclient.methods.PostMethod;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import org.eclipse.jetty.io.Buffer;
|
||||||
|
import org.eclipse.jetty.client.ContentExchange;
|
||||||
|
import org.eclipse.jetty.client.HttpClient;
|
||||||
|
import org.eclipse.jetty.http.HttpFields;
|
||||||
|
import org.eclipse.jetty.io.ByteArrayBuffer;
|
||||||
|
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Message;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
public class AjaxTest extends JettyTestSupport {
|
public class AjaxTest extends JettyTestSupport {
|
||||||
private static final Log LOG = LogFactory.getLog(AjaxTest.class);
|
private static final Log LOG = LogFactory.getLog(AjaxTest.class);
|
||||||
|
|
||||||
private String expectedResponse = "<ajax-response>\n" +
|
private class AjaxTestContentExchange extends ContentExchange {
|
||||||
"<response id='handler' destination='queue://test' >test one</response>\n" +
|
private HashMap<String,String> headers;
|
||||||
"<response id='handler' destination='queue://test' >test two</response>\n" +
|
private String responseContent;
|
||||||
"<response id='handler' destination='queue://test' >test three</response>\n" +
|
|
||||||
"</ajax-response>";
|
AjaxTestContentExchange() {
|
||||||
|
super(true);
|
||||||
public void testReceiveMultipleMessagesFromQueue() throws Exception {
|
this.headers = new HashMap<String,String>();
|
||||||
|
this.responseContent = "";
|
||||||
MessageProducer local_producer = session.createProducer(session.createQueue("test"));
|
}
|
||||||
|
protected void onResponseContent( Buffer content ) {
|
||||||
HttpClient httpClient = new HttpClient();
|
this.responseContent += content.toString();
|
||||||
PostMethod post = new PostMethod( "http://localhost:8080/amq" );
|
}
|
||||||
post.addParameter( "destination", "queue://test" );
|
protected void onResponseHeader( Buffer name, Buffer value ) {
|
||||||
post.addParameter( "type", "listen" );
|
headers.put( name.toString(), value.toString() );
|
||||||
post.addParameter( "message", "handler" );
|
}
|
||||||
httpClient.executeMethod( post );
|
public String getJsessionId() {
|
||||||
|
String cookie = headers.get( "Set-Cookie" );
|
||||||
// send message
|
String[] cookie_parts = cookie.split( ";" );
|
||||||
TextMessage msg1 = session.createTextMessage("test one");
|
return cookie_parts[0];
|
||||||
producer.send(msg1);
|
}
|
||||||
TextMessage msg2 = session.createTextMessage("test two");
|
public String getResponseContent() {
|
||||||
producer.send(msg2);
|
return responseContent;
|
||||||
TextMessage msg3 = session.createTextMessage("test three");
|
}
|
||||||
producer.send(msg3);
|
|
||||||
|
|
||||||
HttpMethod get = new GetMethod( "http://localhost:8080/amq?timeout=5000" );
|
|
||||||
httpClient.executeMethod( get );
|
|
||||||
byte[] responseBody = get.getResponseBody();
|
|
||||||
String response = new String( responseBody );
|
|
||||||
|
|
||||||
LOG.info("Poll response: " + response);
|
|
||||||
assertEquals("Poll response not right", expectedResponse.trim(), response.trim());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void assertContains( String expected, String actual ) {
|
||||||
|
assertTrue( "'"+actual+"' does not contain expected fragment '"+expected+"'", actual.indexOf( expected ) != -1 );
|
||||||
|
}
|
||||||
|
public void assertResponseCount( int expected, String actual ) {
|
||||||
|
int occurrences = StringUtils.countMatches( actual, "<response" );
|
||||||
|
assertEquals( "Expected number of <response> elements is not correct.", expected, occurrences );
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testAjaxClientReceivesMessagesWhichAreSentToQueueWhileClientIsPolling() throws Exception {
|
||||||
|
LOG.debug( "*** testAjaxClientReceivesMessagesWhichAreSentToQueueWhileClientIsPolling ***" );
|
||||||
|
|
||||||
|
HttpClient httpClient = new HttpClient();
|
||||||
|
httpClient.start();
|
||||||
|
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
|
||||||
|
|
||||||
|
// client 1 subscribes to a queue
|
||||||
|
LOG.debug( "SENDING LISTEN" );
|
||||||
|
AjaxTestContentExchange contentExchange = new AjaxTestContentExchange();
|
||||||
|
contentExchange.setMethod( "POST" );
|
||||||
|
contentExchange.setURL("http://localhost:8080/amq");
|
||||||
|
contentExchange.setRequestContent( new ByteArrayBuffer("destination=queue://test&type=listen&message=handler") );
|
||||||
|
contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" );
|
||||||
|
httpClient.send(contentExchange);
|
||||||
|
contentExchange.waitForDone();
|
||||||
|
String jsessionid = contentExchange.getJsessionId();
|
||||||
|
|
||||||
|
// client 1 polls for messages
|
||||||
|
LOG.debug( "SENDING POLL" );
|
||||||
|
AjaxTestContentExchange poll = new AjaxTestContentExchange();
|
||||||
|
poll.setMethod( "GET" );
|
||||||
|
poll.setURL("http://localhost:8080/amq?timeout=5000");
|
||||||
|
poll.setRequestHeader( "Cookie", jsessionid );
|
||||||
|
httpClient.send( poll );
|
||||||
|
|
||||||
|
// while client 1 is polling, client 2 sends messages to the queue
|
||||||
|
LOG.debug( "SENDING MESSAGES" );
|
||||||
|
contentExchange = new AjaxTestContentExchange();
|
||||||
|
contentExchange.setMethod( "POST" );
|
||||||
|
contentExchange.setURL("http://localhost:8080/amq");
|
||||||
|
contentExchange.setRequestContent( new ByteArrayBuffer(
|
||||||
|
"destination=queue://test&type=send&message=msg1&"+
|
||||||
|
"d1=queue://test&t1=send&m1=msg2&"+
|
||||||
|
"d2=queue://test&t2=send&m2=msg3"
|
||||||
|
) );
|
||||||
|
contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" );
|
||||||
|
httpClient.send(contentExchange);
|
||||||
|
contentExchange.waitForDone();
|
||||||
|
LOG.debug( "DONE POSTING MESSAGES" );
|
||||||
|
|
||||||
|
// wait for poll to finish
|
||||||
|
poll.waitForDone();
|
||||||
|
String response = poll.getResponseContent();
|
||||||
|
|
||||||
|
// messages might not all be delivered during the 1st poll. We need to check again.
|
||||||
|
poll = new AjaxTestContentExchange();
|
||||||
|
poll.setMethod( "GET" );
|
||||||
|
poll.setURL("http://localhost:8080/amq?timeout=5000");
|
||||||
|
poll.setRequestHeader( "Cookie", jsessionid );
|
||||||
|
httpClient.send( poll );
|
||||||
|
poll.waitForDone();
|
||||||
|
|
||||||
|
String fullResponse = response + poll.getResponseContent();
|
||||||
|
LOG.debug( "full response : " + fullResponse );
|
||||||
|
|
||||||
|
assertContains( "<response id='handler' destination='queue://test' >msg1</response>\n", fullResponse );
|
||||||
|
assertContains( "<response id='handler' destination='queue://test' >msg2</response>\n", fullResponse );
|
||||||
|
assertContains( "<response id='handler' destination='queue://test' >msg3</response>\n", fullResponse );
|
||||||
|
assertResponseCount( 3, fullResponse );
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testAjaxClientReceivesMessagesWhichAreSentToTopicWhileClientIsPolling() throws Exception {
|
||||||
|
LOG.debug( "*** testAjaxClientReceivesMessagesWhichAreSentToTopicWhileClientIsPolling ***" );
|
||||||
|
|
||||||
|
HttpClient httpClient = new HttpClient();
|
||||||
|
httpClient.start();
|
||||||
|
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
|
||||||
|
|
||||||
|
// client 1 subscribes to a queue
|
||||||
|
LOG.debug( "SENDING LISTEN" );
|
||||||
|
AjaxTestContentExchange contentExchange = new AjaxTestContentExchange();
|
||||||
|
contentExchange.setMethod( "POST" );
|
||||||
|
contentExchange.setURL("http://localhost:8080/amq");
|
||||||
|
contentExchange.setRequestContent( new ByteArrayBuffer("destination=topic://test&type=listen&message=handler") );
|
||||||
|
contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" );
|
||||||
|
httpClient.send(contentExchange);
|
||||||
|
contentExchange.waitForDone();
|
||||||
|
String jsessionid = contentExchange.getJsessionId();
|
||||||
|
|
||||||
|
// client 1 polls for messages
|
||||||
|
LOG.debug( "SENDING POLL" );
|
||||||
|
AjaxTestContentExchange poll = new AjaxTestContentExchange();
|
||||||
|
poll.setMethod( "GET" );
|
||||||
|
poll.setURL("http://localhost:8080/amq?timeout=5000");
|
||||||
|
poll.setRequestHeader( "Cookie", jsessionid );
|
||||||
|
httpClient.send( poll );
|
||||||
|
|
||||||
|
// while client 1 is polling, client 2 sends messages to the queue
|
||||||
|
LOG.debug( "SENDING MESSAGES" );
|
||||||
|
contentExchange = new AjaxTestContentExchange();
|
||||||
|
contentExchange.setMethod( "POST" );
|
||||||
|
contentExchange.setURL("http://localhost:8080/amq");
|
||||||
|
contentExchange.setRequestContent( new ByteArrayBuffer(
|
||||||
|
"destination=topic://test&type=send&message=msg1&"+
|
||||||
|
"d1=topic://test&t1=send&m1=msg2&"+
|
||||||
|
"d2=topic://test&t2=send&m2=msg3"
|
||||||
|
) );
|
||||||
|
contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" );
|
||||||
|
httpClient.send(contentExchange);
|
||||||
|
contentExchange.waitForDone();
|
||||||
|
LOG.debug( "DONE POSTING MESSAGES" );
|
||||||
|
|
||||||
|
// wait for poll to finish
|
||||||
|
poll.waitForDone();
|
||||||
|
String response = poll.getResponseContent();
|
||||||
|
|
||||||
|
// not all messages might be delivered during the 1st poll. We need to check again.
|
||||||
|
poll = new AjaxTestContentExchange();
|
||||||
|
poll.setMethod( "GET" );
|
||||||
|
poll.setURL("http://localhost:8080/amq?timeout=5000");
|
||||||
|
poll.setRequestHeader( "Cookie", jsessionid );
|
||||||
|
httpClient.send( poll );
|
||||||
|
poll.waitForDone();
|
||||||
|
|
||||||
|
String fullResponse = response + poll.getResponseContent();
|
||||||
|
LOG.debug( "full response : " + fullResponse );
|
||||||
|
|
||||||
|
assertContains( "<response id='handler' destination='topic://test' >msg1</response>\n", fullResponse );
|
||||||
|
assertContains( "<response id='handler' destination='topic://test' >msg2</response>\n", fullResponse );
|
||||||
|
assertContains( "<response id='handler' destination='topic://test' >msg3</response>\n", fullResponse );
|
||||||
|
assertResponseCount( 3, fullResponse );
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testAjaxClientReceivesMessagesWhichAreQueuedBeforeClientSubscribes() throws Exception {
|
||||||
|
LOG.debug( "*** testAjaxClientReceivesMessagesWhichAreQueuedBeforeClientSubscribes ***" );
|
||||||
|
// send messages to queue://test
|
||||||
|
producer.send( session.createTextMessage("test one") );
|
||||||
|
producer.send( session.createTextMessage("test two") );
|
||||||
|
producer.send( session.createTextMessage("test three") );
|
||||||
|
|
||||||
|
HttpClient httpClient = new HttpClient();
|
||||||
|
httpClient.start();
|
||||||
|
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
|
||||||
|
|
||||||
|
// client 1 subscribes to queue
|
||||||
|
LOG.debug( "SENDING LISTEN" );
|
||||||
|
AjaxTestContentExchange contentExchange = new AjaxTestContentExchange();
|
||||||
|
contentExchange.setMethod( "POST" );
|
||||||
|
contentExchange.setURL("http://localhost:8080/amq");
|
||||||
|
contentExchange.setRequestContent( new ByteArrayBuffer("destination=queue://test&type=listen&message=handler") );
|
||||||
|
contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" );
|
||||||
|
httpClient.send(contentExchange);
|
||||||
|
contentExchange.waitForDone();
|
||||||
|
String jsessionid = contentExchange.getJsessionId();
|
||||||
|
|
||||||
|
// client 1 polls for messages
|
||||||
|
LOG.debug( "SENDING POLL" );
|
||||||
|
AjaxTestContentExchange poll = new AjaxTestContentExchange();
|
||||||
|
poll.setMethod( "GET" );
|
||||||
|
poll.setURL("http://localhost:8080/amq?timeout=5000");
|
||||||
|
poll.setRequestHeader( "Cookie", jsessionid );
|
||||||
|
httpClient.send( poll );
|
||||||
|
|
||||||
|
// wait for poll to finish
|
||||||
|
poll.waitForDone();
|
||||||
|
String response = poll.getResponseContent();
|
||||||
|
|
||||||
|
assertContains( "<response id='handler' destination='queue://test' >test one</response>\n", response );
|
||||||
|
assertContains( "<response id='handler' destination='queue://test' >test two</response>\n", response );
|
||||||
|
assertContains( "<response id='handler' destination='queue://test' >test three</response>\n", response );
|
||||||
|
assertResponseCount( 3, response );
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testStompMessagesAreReceivedByAjaxClient() throws Exception {
|
||||||
|
LOG.debug( "*** testStompMessagesAreRecievedByAjaxClient ***" );
|
||||||
|
|
||||||
|
HttpClient httpClient = new HttpClient();
|
||||||
|
httpClient.start();
|
||||||
|
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
|
||||||
|
|
||||||
|
// client 1 subscribes to a queue
|
||||||
|
LOG.debug( "SENDING LISTEN" );
|
||||||
|
AjaxTestContentExchange contentExchange = new AjaxTestContentExchange();
|
||||||
|
contentExchange.setMethod( "POST" );
|
||||||
|
contentExchange.setURL("http://localhost:8080/amq");
|
||||||
|
contentExchange.setRequestContent( new ByteArrayBuffer("destination=queue://test&type=listen&message=handler") );
|
||||||
|
contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" );
|
||||||
|
httpClient.send(contentExchange);
|
||||||
|
contentExchange.waitForDone();
|
||||||
|
String jsessionid = contentExchange.getJsessionId();
|
||||||
|
|
||||||
|
// client 1 polls for messages
|
||||||
|
LOG.debug( "SENDING POLL" );
|
||||||
|
AjaxTestContentExchange poll = new AjaxTestContentExchange();
|
||||||
|
poll.setMethod( "GET" );
|
||||||
|
poll.setURL("http://localhost:8080/amq?timeout=5000");
|
||||||
|
poll.setRequestHeader( "Cookie", jsessionid );
|
||||||
|
httpClient.send( poll );
|
||||||
|
|
||||||
|
// stomp client queues some messages
|
||||||
|
StompConnection connection = new StompConnection();
|
||||||
|
connection.open("localhost", 61613);
|
||||||
|
connection.connect("user", "password");
|
||||||
|
HashMap<String, String> headers = new HashMap<String, String>();
|
||||||
|
headers.put( "amq-msg-type", "text" );
|
||||||
|
connection.send( "/queue/test", "message1", (String)null, headers );
|
||||||
|
connection.send( "/queue/test", "message2", (String)null, headers );
|
||||||
|
connection.send( "/queue/test", "message3", (String)null, headers );
|
||||||
|
connection.send( "/queue/test", "message4", (String)null, headers );
|
||||||
|
connection.send( "/queue/test", "message5", (String)null, headers );
|
||||||
|
connection.disconnect();
|
||||||
|
|
||||||
|
// wait for poll to finish
|
||||||
|
poll.waitForDone();
|
||||||
|
String response = poll.getResponseContent();
|
||||||
|
|
||||||
|
// not all messages might be delivered during the 1st poll. We need to check again.
|
||||||
|
poll = new AjaxTestContentExchange();
|
||||||
|
poll.setMethod( "GET" );
|
||||||
|
poll.setURL("http://localhost:8080/amq?timeout=5000");
|
||||||
|
poll.setRequestHeader( "Cookie", jsessionid );
|
||||||
|
httpClient.send( poll );
|
||||||
|
poll.waitForDone();
|
||||||
|
|
||||||
|
String fullResponse = response + poll.getResponseContent();
|
||||||
|
|
||||||
|
assertContains( "<response id='handler' destination='queue://test' >message1</response>\n", fullResponse );
|
||||||
|
assertContains( "<response id='handler' destination='queue://test' >message2</response>\n", fullResponse );
|
||||||
|
assertContains( "<response id='handler' destination='queue://test' >message3</response>\n", fullResponse );
|
||||||
|
assertContains( "<response id='handler' destination='queue://test' >message4</response>\n", fullResponse );
|
||||||
|
assertContains( "<response id='handler' destination='queue://test' >message5</response>\n", fullResponse );
|
||||||
|
assertResponseCount( 5, fullResponse );
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testAjaxMessagesAreReceivedByStompClient() throws Exception {
|
||||||
|
LOG.debug( "*** testAjaxMessagesAreReceivedByStompClient ***" );
|
||||||
|
|
||||||
|
HttpClient httpClient = new HttpClient();
|
||||||
|
httpClient.start();
|
||||||
|
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
|
||||||
|
|
||||||
|
AjaxTestContentExchange contentExchange = new AjaxTestContentExchange();
|
||||||
|
contentExchange.setMethod( "POST" );
|
||||||
|
contentExchange.setURL("http://localhost:8080/amq");
|
||||||
|
contentExchange.setRequestContent( new ByteArrayBuffer(
|
||||||
|
"destination=queue://test&type=send&message=msg1&"+
|
||||||
|
"d1=queue://test&t1=send&m1=msg2&"+
|
||||||
|
"d2=queue://test&t2=send&m2=msg3&"+
|
||||||
|
"d3=queue://test&t3=send&m3=msg4") );
|
||||||
|
contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" );
|
||||||
|
httpClient.send(contentExchange);
|
||||||
|
contentExchange.waitForDone();
|
||||||
|
|
||||||
|
StompConnection connection = new StompConnection();
|
||||||
|
connection.open("localhost", 61613);
|
||||||
|
connection.connect("user", "password");
|
||||||
|
connection.subscribe( "/queue/test" );
|
||||||
|
|
||||||
|
StompFrame message;
|
||||||
|
String allMessageBodies = "";
|
||||||
|
try {
|
||||||
|
while( true ) {
|
||||||
|
message = connection.receive(5000);
|
||||||
|
allMessageBodies = allMessageBodies +"\n"+ message.getBody();
|
||||||
|
}
|
||||||
|
} catch (SocketTimeoutException e) {}
|
||||||
|
|
||||||
|
LOG.debug( "All message bodies : " + allMessageBodies );
|
||||||
|
|
||||||
|
assertContains( "msg1", allMessageBodies );
|
||||||
|
assertContains( "msg2", allMessageBodies );
|
||||||
|
assertContains( "msg3", allMessageBodies );
|
||||||
|
assertContains( "msg4", allMessageBodies );
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testAjaxClientMayUseSelectors() throws Exception {
|
||||||
|
LOG.debug( "*** testAjaxClientMayUseSelectors ***" );
|
||||||
|
|
||||||
|
// send 2 messages to the same queue w/ different 'filter' values.
|
||||||
|
Message msg = session.createTextMessage("test one");
|
||||||
|
msg.setStringProperty( "filter", "one" );
|
||||||
|
producer.send( msg );
|
||||||
|
msg = session.createTextMessage("test two");
|
||||||
|
msg.setStringProperty( "filter", "two" );
|
||||||
|
producer.send( msg );
|
||||||
|
|
||||||
|
HttpClient httpClient = new HttpClient();
|
||||||
|
httpClient.start();
|
||||||
|
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
|
||||||
|
|
||||||
|
// client ubscribes to queue
|
||||||
|
LOG.debug( "SENDING LISTEN" );
|
||||||
|
AjaxTestContentExchange contentExchange = new AjaxTestContentExchange();
|
||||||
|
contentExchange.setMethod( "POST" );
|
||||||
|
contentExchange.setURL("http://localhost:8080/amq");
|
||||||
|
contentExchange.setRequestContent( new ByteArrayBuffer("destination=queue://test&type=listen&message=handler") );
|
||||||
|
contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" );
|
||||||
|
// SELECTOR
|
||||||
|
contentExchange.setRequestHeader( "selector", "filter='two'" );
|
||||||
|
httpClient.send(contentExchange);
|
||||||
|
contentExchange.waitForDone();
|
||||||
|
String jsessionid = contentExchange.getJsessionId();
|
||||||
|
|
||||||
|
// client 1 polls for messages
|
||||||
|
LOG.debug( "SENDING POLL" );
|
||||||
|
AjaxTestContentExchange poll = new AjaxTestContentExchange();
|
||||||
|
poll.setMethod( "GET" );
|
||||||
|
poll.setURL("http://localhost:8080/amq?timeout=5000");
|
||||||
|
poll.setRequestHeader( "Cookie", jsessionid );
|
||||||
|
httpClient.send( poll );
|
||||||
|
poll.waitForDone();
|
||||||
|
|
||||||
|
LOG.debug( poll.getResponseContent() );
|
||||||
|
|
||||||
|
String expected = "<ajax-response>\n" +
|
||||||
|
"<response id='handler' destination='queue://test' >test two</response>\n" +
|
||||||
|
"</ajax-response>\n";
|
||||||
|
assertEquals( "Poll response is not correct.", expected, poll.getResponseContent() );
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testMultipleAjaxClientsMayExistInTheSameSession() throws Exception {
|
||||||
|
LOG.debug( "*** testMultipleAjaxClientsMayExistInTheSameSession ***" );
|
||||||
|
|
||||||
|
// send messages to queues testA and testB.
|
||||||
|
MessageProducer producerA = session.createProducer(session.createQueue("testA"));
|
||||||
|
MessageProducer producerB = session.createProducer(session.createQueue("testB"));
|
||||||
|
producerA.send( session.createTextMessage("A1") );
|
||||||
|
producerA.send( session.createTextMessage("A2") );
|
||||||
|
producerB.send( session.createTextMessage("B1") );
|
||||||
|
producerB.send( session.createTextMessage("B2") );
|
||||||
|
|
||||||
|
HttpClient httpClient = new HttpClient();
|
||||||
|
httpClient.start();
|
||||||
|
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
|
||||||
|
|
||||||
|
// clientA subscribes to /queue/testA
|
||||||
|
LOG.debug( "SENDING LISTEN" );
|
||||||
|
AjaxTestContentExchange contentExchange = new AjaxTestContentExchange();
|
||||||
|
contentExchange.setMethod( "POST" );
|
||||||
|
contentExchange.setURL("http://localhost:8080/amq");
|
||||||
|
contentExchange.setRequestContent( new ByteArrayBuffer(
|
||||||
|
"destination=queue://testA&"+
|
||||||
|
"type=listen&"+
|
||||||
|
"message=handlerA&"+
|
||||||
|
"clientId=clientA"
|
||||||
|
) );
|
||||||
|
contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" );
|
||||||
|
httpClient.send(contentExchange);
|
||||||
|
contentExchange.waitForDone();
|
||||||
|
String jsessionid = contentExchange.getJsessionId();
|
||||||
|
|
||||||
|
// clientB subscribes to /queue/testB using the same JSESSIONID.
|
||||||
|
contentExchange = new AjaxTestContentExchange();
|
||||||
|
contentExchange.setMethod( "POST" );
|
||||||
|
contentExchange.setURL("http://localhost:8080/amq");
|
||||||
|
contentExchange.setRequestHeader( "Cookie", jsessionid );
|
||||||
|
contentExchange.setRequestContent( new ByteArrayBuffer(
|
||||||
|
"destination=queue://testB&"+
|
||||||
|
"type=listen&"+
|
||||||
|
"message=handlerB&"+
|
||||||
|
"clientId=clientB"
|
||||||
|
) );
|
||||||
|
contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" );
|
||||||
|
httpClient.send(contentExchange);
|
||||||
|
contentExchange.waitForDone();
|
||||||
|
|
||||||
|
// clientA polls for messages
|
||||||
|
AjaxTestContentExchange poll = new AjaxTestContentExchange();
|
||||||
|
poll.setMethod( "GET" );
|
||||||
|
poll.setURL("http://localhost:8080/amq?timeout=5000&clientId=clientA");
|
||||||
|
poll.setRequestHeader( "Cookie", jsessionid );
|
||||||
|
httpClient.send( poll );
|
||||||
|
poll.waitForDone();
|
||||||
|
|
||||||
|
LOG.debug( "clientA response : " + poll.getResponseContent() );
|
||||||
|
String expected = "<ajax-response>\n" +
|
||||||
|
"<response id='handlerA' destination='queue://testA' >A1</response>\n" +
|
||||||
|
"<response id='handlerA' destination='queue://testA' >A2</response>\n" +
|
||||||
|
"</ajax-response>\n";
|
||||||
|
assertEquals( "Poll response is not correct.", expected, poll.getResponseContent() );
|
||||||
|
|
||||||
|
// clientB polls for messages
|
||||||
|
poll = new AjaxTestContentExchange();
|
||||||
|
poll.setMethod( "GET" );
|
||||||
|
poll.setURL("http://localhost:8080/amq?timeout=5000&clientId=clientB");
|
||||||
|
poll.setRequestHeader( "Cookie", jsessionid );
|
||||||
|
httpClient.send( poll );
|
||||||
|
poll.waitForDone();
|
||||||
|
|
||||||
|
LOG.debug( "clientB response : " + poll.getResponseContent() );
|
||||||
|
expected = "<ajax-response>\n" +
|
||||||
|
"<response id='handlerB' destination='queue://testB' >B1</response>\n" +
|
||||||
|
"<response id='handlerB' destination='queue://testB' >B2</response>\n" +
|
||||||
|
"</ajax-response>\n";
|
||||||
|
assertEquals( "Poll response is not correct.", expected, poll.getResponseContent() );
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,6 +51,7 @@ public class JettyTestSupport extends TestCase {
|
||||||
broker.setPersistent(false);
|
broker.setPersistent(false);
|
||||||
broker.setUseJmx(true);
|
broker.setUseJmx(true);
|
||||||
broker.addConnector("tcp://localhost:61616");
|
broker.addConnector("tcp://localhost:61616");
|
||||||
|
broker.addConnector("stomp://localhost:61613");
|
||||||
broker.start();
|
broker.start();
|
||||||
broker.waitUntilStarted();
|
broker.waitUntilStarted();
|
||||||
|
|
||||||
|
|
|
@ -66,12 +66,22 @@ public class AjaxListener implements MessageAvailableListener {
|
||||||
if (continuation != null) {
|
if (continuation != null) {
|
||||||
try {
|
try {
|
||||||
Message message = consumer.receive(10);
|
Message message = consumer.receive(10);
|
||||||
continuation.setAttribute("message", message);
|
LOG.debug( "message is " + message );
|
||||||
continuation.setAttribute("consumer", consumer);
|
if( message != null ) {
|
||||||
|
if( continuation.isSuspended() ) {
|
||||||
|
LOG.debug( "Resuming suspended continuation " + continuation );
|
||||||
|
continuation.setAttribute("message", message);
|
||||||
|
continuation.setAttribute("consumer", consumer);
|
||||||
|
continuation.resume();
|
||||||
|
} else {
|
||||||
|
LOG.debug( "Message available, but continuation is already resumed. Buffer for next time." );
|
||||||
|
bufferMessageForDelivery( message );
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error receiving message " + e, e);
|
LOG.error("Error receiving message " + e, e);
|
||||||
}
|
}
|
||||||
continuation.resume();
|
|
||||||
} else if (System.currentTimeMillis() - lastAccess > 2 * this.maximumReadTimeout) {
|
} else if (System.currentTimeMillis() - lastAccess > 2 * this.maximumReadTimeout) {
|
||||||
new Thread() {
|
new Thread() {
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -81,12 +91,18 @@ public class AjaxListener implements MessageAvailableListener {
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
Message message = consumer.receive(10);
|
Message message = consumer.receive(10);
|
||||||
if (message != null) {
|
bufferMessageForDelivery( message );
|
||||||
unconsumedMessages.addLast(message);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error receiving message " + e, e);
|
LOG.error("Error receiving message " + e, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void bufferMessageForDelivery( Message message ) {
|
||||||
|
if( message != null ) {
|
||||||
|
synchronized( unconsumedMessages ) {
|
||||||
|
unconsumedMessages.addLast(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -264,6 +264,8 @@ public class MessageListenerServlet extends MessageServletSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
Message message = null;
|
Message message = null;
|
||||||
|
// this is non-null if we're resuming the continuation.
|
||||||
|
// attributes set in AjaxListener
|
||||||
message = (Message)request.getAttribute("message");
|
message = (Message)request.getAttribute("message");
|
||||||
|
|
||||||
synchronized (client) {
|
synchronized (client) {
|
||||||
|
@ -310,6 +312,7 @@ public class MessageListenerServlet extends MessageServletSupport {
|
||||||
|
|
||||||
continuation.setTimeout(timeout);
|
continuation.setTimeout(timeout);
|
||||||
continuation.suspend();
|
continuation.suspend();
|
||||||
|
LOG.debug( "Suspending continuation " + continuation );
|
||||||
|
|
||||||
// Fetch the listeners
|
// Fetch the listeners
|
||||||
AjaxListener listener = client.getListener();
|
AjaxListener listener = client.getListener();
|
||||||
|
@ -347,13 +350,17 @@ public class MessageListenerServlet extends MessageServletSupport {
|
||||||
|
|
||||||
LinkedList<Message> unconsumedMessages = ((AjaxListener)consumer.getAvailableListener()).getUnconsumedMessages();
|
LinkedList<Message> unconsumedMessages = ((AjaxListener)consumer.getAvailableListener()).getUnconsumedMessages();
|
||||||
LOG.debug("Send " + unconsumedMessages.size() + " unconsumed messages");
|
LOG.debug("Send " + unconsumedMessages.size() + " unconsumed messages");
|
||||||
for (Message msg : unconsumedMessages) {
|
synchronized( unconsumedMessages ) {
|
||||||
messages++;
|
for (Iterator<Message> it = unconsumedMessages.iterator(); it.hasNext(); ) {
|
||||||
String id = consumerIdMap.get(consumer);
|
messages++;
|
||||||
String destinationName = consumerDestinationNameMap.get(consumer);
|
Message msg = it.next();
|
||||||
writeMessageResponse(writer, msg, id, destinationName);
|
String id = consumerIdMap.get(consumer);
|
||||||
if (messages >= maximumMessages) {
|
String destinationName = consumerDestinationNameMap.get(consumer);
|
||||||
break;
|
writeMessageResponse(writer, msg, id, destinationName);
|
||||||
|
it.remove();
|
||||||
|
if (messages >= maximumMessages) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue