git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@943146 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-05-11 15:37:44 +00:00
parent 0e1d63269d
commit 2b9c59c283
4 changed files with 155 additions and 90 deletions

View File

@ -41,7 +41,7 @@ public class WaitForJettyListener {
socket.close(); socket.close();
canConnect = true; canConnect = true;
} catch (Exception e) { } catch (Exception e) {
LOG.warn("verify jettty available, failed to connect to " + url + e); LOG.warn("verify jetty available, failed to connect to " + url + e);
} }
return canConnect; return canConnect;
}}, 60 * 1000)); }}, 60 * 1000));

View File

@ -1,19 +1,27 @@
package org.apache.activemq.web; package org.apache.activemq.web;
import java.net.Socket;
import java.net.URL;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import javax.net.SocketFactory;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.webapp.WebAppContext; import org.eclipse.jetty.webapp.WebAppContext;
import junit.framework.TestCase;
public class JettyTestSupport extends TestCase { public class JettyTestSupport extends TestCase {
private static final Log LOG = LogFactory.getLog(JettyTestSupport.class);
BrokerService broker; BrokerService broker;
Server server; Server server;
@ -44,6 +52,7 @@ public class JettyTestSupport extends TestCase {
connector connector
}); });
server.start(); server.start();
waitForJettySocketToAccept("http://localhost:8080");
factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = factory.createConnection(); connection = factory.createConnection();
@ -60,6 +69,21 @@ public class JettyTestSupport extends TestCase {
connection.close(); connection.close();
} }
public void waitForJettySocketToAccept(String bindLocation) throws Exception {
final URL url = new URL(bindLocation);
assertTrue("Jetty endpoint is available", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
boolean canConnect = false;
try {
Socket socket = SocketFactory.getDefault().createSocket(url.getHost(), url.getPort());
socket.close();
canConnect = true;
} catch (Exception e) {
LOG.warn("verify jetty available, failed to connect to " + url + e);
}
return canConnect;
}}, 60 * 1000));
}
} }

View File

@ -2,13 +2,17 @@ package org.apache.activemq.web;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.jetty.client.ContentExchange; import org.eclipse.jetty.client.ContentExchange;
import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClient;
public class RestTest extends JettyTestSupport { public class RestTest extends JettyTestSupport {
private static final Log LOG = LogFactory.getLog(RestTest.class);
public void testConsume() throws Exception { public void testConsume() throws Exception {
producer.send(session.createTextMessage("test")); producer.send(session.createTextMessage("test"));
LOG.info("message sent");
HttpClient httpClient = new HttpClient(); HttpClient httpClient = new HttpClient();
httpClient.start(); httpClient.start();
@ -18,17 +22,35 @@ public class RestTest extends JettyTestSupport {
httpClient.send(contentExchange); httpClient.send(contentExchange);
contentExchange.waitForDone(); contentExchange.waitForDone();
assertEquals("test", contentExchange.getResponseContent()); assertEquals("test", contentExchange.getResponseContent());
}
public void testSubscribeFirst() throws Exception {
HttpClient httpClient = new HttpClient();
httpClient.start();
ContentExchange contentExchange = new ContentExchange();
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
contentExchange.setURL("http://localhost:8080/message/test?readTimeout=5000&type=queue");
httpClient.send(contentExchange);
Thread.sleep(1000);
producer.send(session.createTextMessage("test"));
LOG.info("message sent");
contentExchange.waitForDone();
assertEquals("test", contentExchange.getResponseContent());
} }
public void testSelector() throws Exception { public void testSelector() throws Exception {
TextMessage msg1 = session.createTextMessage("test1"); TextMessage msg1 = session.createTextMessage("test1");
msg1.setIntProperty("test", 1); msg1.setIntProperty("test", 1);
producer.send(msg1); producer.send(msg1);
LOG.info("message 1 sent");
TextMessage msg2 = session.createTextMessage("test2"); TextMessage msg2 = session.createTextMessage("test2");
msg2.setIntProperty("test", 2); msg2.setIntProperty("test", 2);
producer.send(msg2); producer.send(msg2);
LOG.info("message 2 sent");
HttpClient httpClient = new HttpClient(); HttpClient httpClient = new HttpClient();
httpClient.start(); httpClient.start();

View File

@ -162,56 +162,70 @@ public class MessageServlet extends MessageServletSupport {
* @throws IOException * @throws IOException
*/ */
protected void doMessages(HttpServletRequest request, HttpServletResponse response, int maxMessages) throws ServletException, IOException { protected void doMessages(HttpServletRequest request, HttpServletResponse response, int maxMessages) throws ServletException, IOException {
int messages = 0;
try { try {
WebClient client = getWebClient(request); WebClient client = getWebClient(request);
Destination destination = getDestination(client, request); Destination destination = getDestination(client, request);
if (destination == null) { if (destination == null) {
throw new NoDestinationSuppliedException(); throw new NoDestinationSuppliedException();
} }
long timeout = getReadTimeout(request); MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
boolean ajax = isRicoAjax(request); Message message = null;
if (!ajax) { message = (Message)request.getAttribute("message");
maxMessages = 1; if (message != null) {
// we're resuming continuation,
// so just write the message and return
writeResponse(request, response, maxMessages, message, consumer);
return;
} }
long timeout = getReadTimeout(request);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout); LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout);
} }
MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
Continuation continuation = null; Continuation continuation = null;
Listener listener = null; Listener listener = null;
Message message = null;
synchronized (consumer) {
// Fetch the listeners
listener = (Listener)consumer.getAvailableListener();
if (listener == null) {
listener = new Listener(consumer);
consumer.setAvailableListener(listener);
}
// Look for any available messages // Look for any available messages
message = consumer.receiveNoWait(); message = consumer.receive(10);
// Get an existing Continuation or create a new one if there are // Get an existing Continuation or create a new one if there are
// no events. // no events.
if (message == null) { if (message == null) {
continuation = ContinuationSupport.getContinuation(request); continuation = ContinuationSupport.getContinuation(request);
// register this continuation with our listener. if (continuation.isExpired()) {
listener.setContinuation(continuation); response.setStatus(isRicoAjax(request) ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
return;
// Get the continuation object (may wait and/or retry
// request here).
continuation.setTimeout(timeout);
continuation.suspend();
} }
// Try again now continuation.setTimeout(timeout);
if (message == null) { continuation.suspend();
message = consumer.receiveNoWait();
// Fetch the listeners
listener = (Listener)consumer.getAvailableListener();
if (listener == null) {
listener = new Listener(consumer);
consumer.setAvailableListener(listener);
}
// register this continuation with our listener.
listener.setContinuation(continuation);
}
writeResponse(request, response, maxMessages, message, consumer);
} catch (JMSException e) {
throw new ServletException("Could not post JMS message: " + e, e);
}
}
protected void writeResponse(HttpServletRequest request, HttpServletResponse response, int maxMessages, Message message, MessageAvailableConsumer consumer) throws IOException, JMSException {
int messages = 0;
try {
boolean ajax = isRicoAjax(request);
if (!ajax) {
maxMessages = 1;
} }
// write a responds // write a responds
@ -225,7 +239,8 @@ public class MessageServlet extends MessageServletSupport {
// handle any message(s) // handle any message(s)
if (message == null) { if (message == null) {
// No messages so OK response of for ajax else no content. // No messages so OK response of for ajax else no content.
response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT); response.setStatus(ajax ? HttpServletResponse.SC_OK
: HttpServletResponse.SC_NO_CONTENT);
} else { } else {
// We have at least one message so set up the response // We have at least one message so set up the response
response.setStatus(HttpServletResponse.SC_OK); response.setStatus(HttpServletResponse.SC_OK);
@ -236,7 +251,8 @@ public class MessageServlet extends MessageServletSupport {
// send a response for each available message (up to max // send a response for each available message (up to max
// messages) // messages)
while ((maxMessages < 0 || messages < maxMessages) && message != null) { while ((maxMessages < 0 || messages < maxMessages)
&& message != null) {
if (ajax) { if (ajax) {
writer.print("<response type='object' id='"); writer.print("<response type='object' id='");
writer.print(request.getParameter("id")); writer.print(request.getParameter("id"));
@ -261,12 +277,10 @@ public class MessageServlet extends MessageServletSupport {
} }
if (ajax) { if (ajax) {
writer.println("<response type='object' id='poll'><ok/></response>"); writer
.println("<response type='object' id='poll'><ok/></response>");
writer.println("</ajax-response>"); writer.println("</ajax-response>");
} }
}
} catch (JMSException e) {
throw new ServletException("Could not post JMS message: " + e, e);
} finally { } finally {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Received " + messages + " message(s)"); LOG.debug("Received " + messages + " message(s)");
@ -475,9 +489,14 @@ public class MessageServlet extends MessageServletSupport {
synchronized (this.consumer) { synchronized (this.consumer) {
if (continuation != null) { if (continuation != null) {
try {
Message message = consumer.receiveNoWait();
continuation.setAttribute("message", message);
} catch (Exception e) {
e.printStackTrace();
}
continuation.resume(); continuation.resume();
} }
continuation = null;
} }
} }
} }