diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java index 2568e465f6..376eedd2a7 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java @@ -95,6 +95,9 @@ public class MessageServlet extends MessageServletSupport { TextMessage message = client.getSession().createTextMessage(text); appendParametersToMessage(request, message); + boolean persistent = isSendPersistent(request); + int priority = getSendPriority(request); + long timeToLive = getSendTimeToLive(request); client.send(destination, message); // lets return a unique URI for reliable messaging diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java index d264971d2b..8431b9ba18 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java @@ -52,8 +52,11 @@ public abstract class MessageServletSupport extends HttpServlet { private boolean defaultTopicFlag = true; private Destination defaultDestination; private String destinationParameter = "destination"; - private String topicParameter = "topic"; + private String typeParameter = "type"; private String bodyParameter = "body"; + private boolean defaultMessagePersistent = true; + private int defaultMessagePriority = 5; + private long defaultMessageTimeToLive = 0; public void init(ServletConfig servletConfig) throws ServletException { @@ -144,7 +147,7 @@ public abstract class MessageServletSupport extends HttpServlet { for (Iterator iter = parameters.entrySet().iterator(); iter.hasNext();) { Map.Entry entry = (Map.Entry) iter.next(); String name = (String) entry.getKey(); - if (!destinationParameter.equals(name) && !topicParameter.equals(name) && !bodyParameter.equals(name)) { + if (!destinationParameter.equals(name) && !typeParameter.equals(name) && !bodyParameter.equals(name)) { Object value = entry.getValue(); if (value instanceof Object[]) { Object[] array = (Object[]) value; @@ -164,7 +167,26 @@ public abstract class MessageServletSupport extends HttpServlet { } } } - + } + + protected long getSendTimeToLive(HttpServletRequest request) { + String text = request.getParameter("JMSTimeToLive"); + if (text != null) { + return asLong(text); + } + return defaultMessageTimeToLive; + } + + protected int getSendPriority(HttpServletRequest request) { + String text = request.getParameter("JMSPriority"); + if (text != null) { + return asInt(text); + } + return defaultMessagePriority; + } + + protected boolean isSendPersistent(HttpServletRequest request) { + return defaultMessagePersistent; } protected Destination asDestination(Object value) { @@ -208,6 +230,14 @@ public abstract class MessageServletSupport extends HttpServlet { return null; } + protected long asLong(String name) { + return Long.parseLong(name); + } + + protected int asInt(String name) { + return Integer.parseInt(name); + } + protected String asString(Object value) { if (value instanceof String[]) { return ((String[])value)[0]; @@ -291,16 +321,11 @@ public abstract class MessageServletSupport extends HttpServlet { * @return true if the current request is for a topic destination, else false if its for a queue */ protected boolean isTopic(HttpServletRequest request) { - boolean aTopic = defaultTopicFlag; - String aTopicText = request.getParameter(topicParameter); - if (aTopicText != null) { - aTopic = asBoolean(aTopicText); + String typeText = request.getParameter(typeParameter); + if (typeText == null) { + return defaultTopicFlag; } - return aTopic; - } - - protected long asLong(String name) { - return Long.parseLong(name); + return typeText.equalsIgnoreCase("topic"); } /** diff --git a/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java b/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java index 656529a7aa..5a39203693 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java @@ -52,12 +52,12 @@ import org.apache.commons.logging.LogFactory; import edu.emory.mathcs.backport.java.util.concurrent.Semaphore; /** - * Represents a messaging client used from inside a web container - * typically stored inside a HttpSession + * Represents a messaging client used from inside a web container typically + * stored inside a HttpSession * - * TODO controls to prevent DOS attacks with users requesting many consumers + * TODO controls to prevent DOS attacks with users requesting many consumers * TODO configure consumers with small prefetch. - * + * * @version $Revision: 1.1.1.1 $ */ public class WebClient implements HttpSessionActivationListener, HttpSessionBindingListener, Externalizable { @@ -68,8 +68,7 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind private static final Log log = LogFactory.getLog(WebClient.class); private static transient ConnectionFactory factory; - - + private transient Map consumers = new HashMap(); private transient ActiveMQConnection connection; private transient ActiveMQSession session; @@ -78,15 +77,14 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind private final Semaphore semaphore = new Semaphore(1); - /** - * @return the web client for the current HTTP session or null if there is not a web client created yet + * @return the web client for the current HTTP session or null if there is + * not a web client created yet */ public static WebClient getWebClient(HttpSession session) { return (WebClient) session.getAttribute(webClientAttribute); } - public static void initContext(ServletContext context) { initConnectionFactory(context); } @@ -94,34 +92,29 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind /** */ public WebClient() { - if (factory==null) + if (factory == null) throw new IllegalStateException("initContext(ServletContext) not called"); } - public int getDeliveryMode() { return deliveryMode; } - public void setDeliveryMode(int deliveryMode) { this.deliveryMode = deliveryMode; } - - public synchronized void closeConsumers() - { + public synchronized void closeConsumers() { for (Iterator it = consumers.values().iterator(); it.hasNext();) { MessageConsumer consumer = (MessageConsumer) it.next(); it.remove(); - try{ + try { consumer.setMessageListener(null); if (consumer instanceof MessageAvailableConsumer) - ((MessageAvailableConsumer)consumer).setAvailableListener(null); + ((MessageAvailableConsumer) consumer).setAvailableListener(null); consumer.close(); } - catch(JMSException e) - { + catch (JMSException e) { e.printStackTrace(); } } @@ -130,55 +123,51 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind public synchronized void close() { try { closeConsumers(); - if (connection!=null) + if (connection != null) connection.close(); - } catch (JMSException e) { + } + catch (JMSException e) { throw new RuntimeException(e); } finally { producer = null; session = null; connection = null; - if (consumers!=null) + if (consumers != null) consumers.clear(); - consumers=null; + consumers = null; } } - - public boolean isClosed() - { - return consumers==null; + + public boolean isClosed() { + return consumers == null; } public void writeExternal(ObjectOutput out) throws IOException { - - if (consumers!=null) - { + if (consumers != null) { out.write(consumers.size()); - Iterator i=consumers.keySet().iterator(); - while(i.hasNext()) + Iterator i = consumers.keySet().iterator(); + while (i.hasNext()) out.writeObject(i.next().toString()); } else out.write(-1); - + } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { int size = in.readInt(); - if (size >=0) { + if (size >= 0) { consumers = new HashMap(); - for (int i=0;i