diff --git a/activemq-web-demo/src/main/webapp/WEB-INF/web.xml b/activemq-web-demo/src/main/webapp/WEB-INF/web.xml index c16f388045..f9e9a5a103 100755 --- a/activemq-web-demo/src/main/webapp/WEB-INF/web.xml +++ b/activemq-web-demo/src/main/webapp/WEB-INF/web.xml @@ -52,6 +52,14 @@ MessageServlet org.apache.activemq.web.MessageServlet 1 + + diff --git a/activemq-web/pom.xml b/activemq-web/pom.xml index 00f876f193..52eb00e6a1 100755 --- a/activemq-web/pom.xml +++ b/activemq-web/pom.xml @@ -36,6 +36,10 @@ ${pom.groupId} activemq-core + + ${pom.groupId} + activemq-camel + ${pom.groupId} activemq-core diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java index 817924ad54..a2354e22bf 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java @@ -19,6 +19,7 @@ package org.apache.activemq.web; import java.io.IOException; import java.io.PrintWriter; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -29,12 +30,20 @@ import javax.jms.MessageConsumer; import javax.jms.ObjectMessage; import javax.jms.TextMessage; import javax.servlet.ServletConfig; +import javax.servlet.ServletContext; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.activemq.MessageAvailableConsumer; import org.apache.activemq.MessageAvailableListener; +import org.apache.activemq.camel.converter.ActiveMQMessageConverter; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Producer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.mortbay.util.ajax.Continuation; @@ -56,6 +65,9 @@ public class MessageServlet extends MessageServletSupport { private String readTimeoutParameter = "readTimeout"; private long defaultReadTimeout = -1; private long maximumReadTimeout = 20000; + private long requestTimeout = 1000; + + private HashMap clients = new HashMap(); public void init() throws ServletException { ServletConfig servletConfig = getServletConfig(); @@ -67,6 +79,10 @@ public class MessageServlet extends MessageServletSupport { if (name != null) { maximumReadTimeout = asLong(name); } + name = servletConfig.getInitParameter("replyTimeout"); + if (name != null) { + requestTimeout = asLong(name); + } } /** @@ -80,7 +96,7 @@ public class MessageServlet extends MessageServletSupport { protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { // lets turn the HTTP post into a JMS Message try { - WebClient client = WebClient.getWebClient(request); + WebClient client = getWebClient(request); String text = getPostedMessageBody(request); @@ -94,12 +110,28 @@ public class MessageServlet extends MessageServletSupport { LOG.debug("Sending message to: " + destination + " with text: " + text); } + boolean sync = isSync(request); TextMessage message = client.getSession().createTextMessage(text); - appendParametersToMessage(request, message); - boolean persistent = isSendPersistent(request); - int priority = getSendPriority(request); - long timeToLive = getSendTimeToLive(request); - client.send(destination, message, persistent, priority, timeToLive); + + if (sync) { + String point = "activemq:" + + ((ActiveMQDestination)destination).getPhysicalName().replace("//", "") + + "?requestTimeout=" + requestTimeout; + try { + String body = (String)client.getProducerTemplate().requestBody(point, text); + ActiveMQTextMessage answer = new ActiveMQTextMessage(); + answer.setText(body); + writeMessageResponse(response.getWriter(), answer); + } catch (Exception e) { + throw new IOException(e); + } + } else { + appendParametersToMessage(request, message); + boolean persistent = isSendPersistent(request); + int priority = getSendPriority(request); + long timeToLive = getSendTimeToLive(request); + client.send(destination, message, persistent, priority, timeToLive); + } // lets return a unique URI for reliable messaging response.setHeader("messageID", message.getJMSMessageID()); @@ -137,7 +169,7 @@ public class MessageServlet extends MessageServletSupport { int messages = 0; try { - WebClient client = WebClient.getWebClient(request); + WebClient client = getWebClient(request); Destination destination = getDestination(client, request); if (destination == null) { throw new NoDestinationSuppliedException(); @@ -224,8 +256,10 @@ public class MessageServlet extends MessageServletSupport { } // look for next message - message = consumer.receiveNoWait(); messages++; + if(maxMessages < 0 || messages < maxMessages) { + message = consumer.receiveNoWait(); + } } } @@ -255,7 +289,7 @@ public class MessageServlet extends MessageServletSupport { int messages = 0; try { - WebClient client = WebClient.getWebClient(request); + WebClient client = getWebClient(request); Destination destination = getDestination(client, request); long timeout = getReadTimeout(request); boolean ajax = isRicoAjax(request); @@ -317,8 +351,11 @@ public class MessageServlet extends MessageServletSupport { } // look for next message - message = consumer.receiveNoWait(); messages++; + if(maxMessages < 0 || messages < maxMessages) { + message = consumer.receiveNoWait(); + } + } } } finally { @@ -362,6 +399,25 @@ public class MessageServlet extends MessageServletSupport { String rico = request.getParameter("rico"); return rico != null && rico.equals("true"); } + + public WebClient getWebClient(HttpServletRequest request) { + String clientId = request.getParameter("clientId"); + if (clientId != null) { + synchronized(this) { + LOG.debug("Getting local client [" + clientId + "]"); + WebClient client = clients.get(clientId); + if (client == null) { + LOG.debug("Creating new client [" + clientId + "]"); + client = new WebClient(); + clients.put(clientId, client); + } + return client; + } + + } else { + return WebClient.getWebClient(request); + } + } protected String getContentType(HttpServletRequest request) { /* diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java index d1e2221950..8de693e712 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java @@ -179,6 +179,14 @@ public abstract class MessageServletSupport extends HttpServlet { return defaultMessagePersistent; } + protected boolean isSync(HttpServletRequest request) { + String text = request.getParameter("sync"); + if (text != null) { + return true; + } + return false; + } + protected Destination asDestination(Object value) { if (value instanceof Destination) { return (Destination)value; diff --git a/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java b/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java index 9d958b2dda..6c8d9e074a 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java @@ -47,9 +47,17 @@ import javax.servlet.http.HttpSessionEvent; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.MessageAvailableConsumer; +import org.apache.activemq.camel.component.ActiveMQComponent; +import org.apache.activemq.camel.component.ActiveMQConfiguration; +import org.apache.activemq.pool.PooledConnectionFactory; +import org.apache.camel.CamelContext; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.impl.DefaultCamelContext; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import sun.util.logging.resources.logging; + /** * Represents a messaging client used from inside a web container typically * stored inside a HttpSession TODO controls to prevent DOS attacks with users @@ -77,6 +85,9 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind private final Semaphore semaphore = new Semaphore(1); + private CamelContext camelContext; + private ProducerTemplate producerTemplate; + public WebClient() { if (factory == null) { throw new IllegalStateException("initContext(ServletContext) not called"); @@ -111,6 +122,7 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind public static void initContext(ServletContext context) { initConnectionFactory(context); + context.setAttribute("webClients", new HashMap()); } public int getDeliveryMode() { @@ -143,12 +155,16 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind if (connection != null) { connection.close(); } - } catch (JMSException e) { + if (producerTemplate != null) { + producerTemplate.stop(); + } + } catch (Exception e) { LOG.debug("caught exception closing consumer", e); } finally { producer = null; session = null; connection = null; + producerTemplate = null; if (consumers != null) { consumers.clear(); } @@ -256,6 +272,27 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind servletContext.setAttribute(CONNECTION_FACTORY_ATTRIBUTE, factory); } } + + public synchronized CamelContext getCamelContext() { + if (camelContext == null) { + LOG.debug("Creating camel context"); + camelContext = new DefaultCamelContext(); + ActiveMQConfiguration conf = new ActiveMQConfiguration(); + conf.setConnectionFactory(new PooledConnectionFactory((ActiveMQConnectionFactory)factory)); + ActiveMQComponent component = new ActiveMQComponent(conf); + camelContext.addComponent("activemq", component); + } + return camelContext; + } + + public synchronized ProducerTemplate getProducerTemplate() throws Exception { + if (producerTemplate == null) { + LOG.debug("Creating producer template"); + producerTemplate = getCamelContext().createProducerTemplate(); + producerTemplate.start(); + } + return producerTemplate; + } public synchronized MessageProducer getProducer() throws JMSException { if (producer == null) {