allow priority, peristence and timeouts to be specified in REST front end and use a 'type=' attribute to make it easier to provide alternative destinations

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@396897 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-04-25 14:19:43 +00:00
parent 8ad6fd36a1
commit 33cf032b38
3 changed files with 102 additions and 72 deletions

View File

@ -95,6 +95,9 @@ public class MessageServlet extends MessageServletSupport {
TextMessage message = client.getSession().createTextMessage(text); TextMessage message = client.getSession().createTextMessage(text);
appendParametersToMessage(request, message); appendParametersToMessage(request, message);
boolean persistent = isSendPersistent(request);
int priority = getSendPriority(request);
long timeToLive = getSendTimeToLive(request);
client.send(destination, message); client.send(destination, message);
// lets return a unique URI for reliable messaging // lets return a unique URI for reliable messaging

View File

@ -52,8 +52,11 @@ public abstract class MessageServletSupport extends HttpServlet {
private boolean defaultTopicFlag = true; private boolean defaultTopicFlag = true;
private Destination defaultDestination; private Destination defaultDestination;
private String destinationParameter = "destination"; private String destinationParameter = "destination";
private String topicParameter = "topic"; private String typeParameter = "type";
private String bodyParameter = "body"; private String bodyParameter = "body";
private boolean defaultMessagePersistent = true;
private int defaultMessagePriority = 5;
private long defaultMessageTimeToLive = 0;
public void init(ServletConfig servletConfig) throws ServletException { public void init(ServletConfig servletConfig) throws ServletException {
@ -144,7 +147,7 @@ public abstract class MessageServletSupport extends HttpServlet {
for (Iterator iter = parameters.entrySet().iterator(); iter.hasNext();) { for (Iterator iter = parameters.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry) iter.next(); Map.Entry entry = (Map.Entry) iter.next();
String name = (String) entry.getKey(); String name = (String) entry.getKey();
if (!destinationParameter.equals(name) && !topicParameter.equals(name) && !bodyParameter.equals(name)) { if (!destinationParameter.equals(name) && !typeParameter.equals(name) && !bodyParameter.equals(name)) {
Object value = entry.getValue(); Object value = entry.getValue();
if (value instanceof Object[]) { if (value instanceof Object[]) {
Object[] array = (Object[]) value; Object[] array = (Object[]) value;
@ -164,7 +167,26 @@ public abstract class MessageServletSupport extends HttpServlet {
} }
} }
} }
}
protected long getSendTimeToLive(HttpServletRequest request) {
String text = request.getParameter("JMSTimeToLive");
if (text != null) {
return asLong(text);
}
return defaultMessageTimeToLive;
}
protected int getSendPriority(HttpServletRequest request) {
String text = request.getParameter("JMSPriority");
if (text != null) {
return asInt(text);
}
return defaultMessagePriority;
}
protected boolean isSendPersistent(HttpServletRequest request) {
return defaultMessagePersistent;
} }
protected Destination asDestination(Object value) { protected Destination asDestination(Object value) {
@ -208,6 +230,14 @@ public abstract class MessageServletSupport extends HttpServlet {
return null; return null;
} }
protected long asLong(String name) {
return Long.parseLong(name);
}
protected int asInt(String name) {
return Integer.parseInt(name);
}
protected String asString(Object value) { protected String asString(Object value) {
if (value instanceof String[]) { if (value instanceof String[]) {
return ((String[])value)[0]; return ((String[])value)[0];
@ -291,16 +321,11 @@ public abstract class MessageServletSupport extends HttpServlet {
* @return true if the current request is for a topic destination, else false if its for a queue * @return true if the current request is for a topic destination, else false if its for a queue
*/ */
protected boolean isTopic(HttpServletRequest request) { protected boolean isTopic(HttpServletRequest request) {
boolean aTopic = defaultTopicFlag; String typeText = request.getParameter(typeParameter);
String aTopicText = request.getParameter(topicParameter); if (typeText == null) {
if (aTopicText != null) { return defaultTopicFlag;
aTopic = asBoolean(aTopicText);
} }
return aTopic; return typeText.equalsIgnoreCase("topic");
}
protected long asLong(String name) {
return Long.parseLong(name);
} }
/** /**

View File

@ -52,12 +52,12 @@ import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.Semaphore; import edu.emory.mathcs.backport.java.util.concurrent.Semaphore;
/** /**
* Represents a messaging client used from inside a web container * Represents a messaging client used from inside a web container typically
* typically stored inside a HttpSession * stored inside a HttpSession
* *
* TODO controls to prevent DOS attacks with users requesting many consumers * TODO controls to prevent DOS attacks with users requesting many consumers
* TODO configure consumers with small prefetch. * TODO configure consumers with small prefetch.
* *
* @version $Revision: 1.1.1.1 $ * @version $Revision: 1.1.1.1 $
*/ */
public class WebClient implements HttpSessionActivationListener, HttpSessionBindingListener, Externalizable { public class WebClient implements HttpSessionActivationListener, HttpSessionBindingListener, Externalizable {
@ -68,8 +68,7 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind
private static final Log log = LogFactory.getLog(WebClient.class); private static final Log log = LogFactory.getLog(WebClient.class);
private static transient ConnectionFactory factory; private static transient ConnectionFactory factory;
private transient Map consumers = new HashMap(); private transient Map consumers = new HashMap();
private transient ActiveMQConnection connection; private transient ActiveMQConnection connection;
private transient ActiveMQSession session; private transient ActiveMQSession session;
@ -78,15 +77,14 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind
private final Semaphore semaphore = new Semaphore(1); private final Semaphore semaphore = new Semaphore(1);
/** /**
* @return the web client for the current HTTP session or null if there is not a web client created yet * @return the web client for the current HTTP session or null if there is
* not a web client created yet
*/ */
public static WebClient getWebClient(HttpSession session) { public static WebClient getWebClient(HttpSession session) {
return (WebClient) session.getAttribute(webClientAttribute); return (WebClient) session.getAttribute(webClientAttribute);
} }
public static void initContext(ServletContext context) { public static void initContext(ServletContext context) {
initConnectionFactory(context); initConnectionFactory(context);
} }
@ -94,34 +92,29 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind
/** /**
*/ */
public WebClient() { public WebClient() {
if (factory==null) if (factory == null)
throw new IllegalStateException("initContext(ServletContext) not called"); throw new IllegalStateException("initContext(ServletContext) not called");
} }
public int getDeliveryMode() { public int getDeliveryMode() {
return deliveryMode; return deliveryMode;
} }
public void setDeliveryMode(int deliveryMode) { public void setDeliveryMode(int deliveryMode) {
this.deliveryMode = deliveryMode; this.deliveryMode = deliveryMode;
} }
public synchronized void closeConsumers() {
public synchronized void closeConsumers()
{
for (Iterator it = consumers.values().iterator(); it.hasNext();) { for (Iterator it = consumers.values().iterator(); it.hasNext();) {
MessageConsumer consumer = (MessageConsumer) it.next(); MessageConsumer consumer = (MessageConsumer) it.next();
it.remove(); it.remove();
try{ try {
consumer.setMessageListener(null); consumer.setMessageListener(null);
if (consumer instanceof MessageAvailableConsumer) if (consumer instanceof MessageAvailableConsumer)
((MessageAvailableConsumer)consumer).setAvailableListener(null); ((MessageAvailableConsumer) consumer).setAvailableListener(null);
consumer.close(); consumer.close();
} }
catch(JMSException e) catch (JMSException e) {
{
e.printStackTrace(); e.printStackTrace();
} }
} }
@ -130,55 +123,51 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind
public synchronized void close() { public synchronized void close() {
try { try {
closeConsumers(); closeConsumers();
if (connection!=null) if (connection != null)
connection.close(); connection.close();
} catch (JMSException e) { }
catch (JMSException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
finally { finally {
producer = null; producer = null;
session = null; session = null;
connection = null; connection = null;
if (consumers!=null) if (consumers != null)
consumers.clear(); consumers.clear();
consumers=null; consumers = null;
} }
} }
public boolean isClosed() public boolean isClosed() {
{ return consumers == null;
return consumers==null;
} }
public void writeExternal(ObjectOutput out) throws IOException { public void writeExternal(ObjectOutput out) throws IOException {
if (consumers != null) {
if (consumers!=null)
{
out.write(consumers.size()); out.write(consumers.size());
Iterator i=consumers.keySet().iterator(); Iterator i = consumers.keySet().iterator();
while(i.hasNext()) while (i.hasNext())
out.writeObject(i.next().toString()); out.writeObject(i.next().toString());
} }
else else
out.write(-1); out.write(-1);
} }
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
int size = in.readInt(); int size = in.readInt();
if (size >=0) { if (size >= 0) {
consumers = new HashMap(); consumers = new HashMap();
for (int i=0;i<size;i++) { for (int i = 0; i < size; i++) {
String destinationName = in.readObject().toString(); String destinationName = in.readObject().toString();
try{ try {
Destination destination = destinationName.startsWith("topic://") Destination destination = destinationName.startsWith("topic://") ? (Destination) getSession().createTopic(destinationName)
?(Destination)getSession().createTopic(destinationName) : (Destination) getSession().createQueue(destinationName);
:(Destination)getSession().createQueue(destinationName); consumers.put(destination, getConsumer(destination, true));
consumers.put(destination,getConsumer(destination, true));
} }
catch (JMSException e) catch (JMSException e) {
{
e.printStackTrace(); // TODO better handling? e.printStackTrace(); // TODO better handling?
} }
} }
@ -186,11 +175,15 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind
} }
public void send(Destination destination, Message message) throws JMSException { public void send(Destination destination, Message message) throws JMSException {
if (producer == null) { getProducer().send(destination, message);
producer = getSession().createProducer(null); if (log.isDebugEnabled()) {
producer.setDeliveryMode(deliveryMode ); log.debug("Sent! to destination: " + destination + " message: " + message);
} }
producer.send(destination, message); }
public void send(Destination destination, Message message, boolean persistent, int priority, int timeToLive) throws JMSException {
int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
getProducer().send(destination, message, deliveryMode, priority, timeToLive);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Sent! to destination: " + destination + " message: " + message); log.debug("Sent! to destination: " + destination + " message: " + message);
} }
@ -212,30 +205,41 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind
} }
public static synchronized void initConnectionFactory(ServletContext servletContext) { public static synchronized void initConnectionFactory(ServletContext servletContext) {
if (factory==null) if (factory == null)
factory = (ConnectionFactory) servletContext.getAttribute(connectionFactoryAttribute); factory = (ConnectionFactory) servletContext.getAttribute(connectionFactoryAttribute);
if (factory == null) { if (factory == null) {
String brokerURL = servletContext.getInitParameter(brokerUrlInitParam); String brokerURL = servletContext.getInitParameter(brokerUrlInitParam);
servletContext.log("Value of: " + brokerUrlInitParam + " is: " + brokerURL); servletContext.log("Value of: " + brokerUrlInitParam + " is: " + brokerURL);
if (brokerURL == null) { if (brokerURL == null) {
brokerURL = "vm://localhost"; brokerURL = "vm://localhost";
} }
ActiveMQConnectionFactory amqfactory = new ActiveMQConnectionFactory(brokerURL); ActiveMQConnectionFactory amqfactory = new ActiveMQConnectionFactory(brokerURL);
factory = amqfactory; factory = amqfactory;
servletContext.setAttribute(connectionFactoryAttribute, factory); servletContext.setAttribute(connectionFactoryAttribute, factory);
} }
} }
public synchronized MessageProducer getProducer() throws JMSException {
if (producer == null) {
producer = getSession().createProducer(null);
producer.setDeliveryMode(deliveryMode);
}
return producer;
}
public void setProducer(MessageProducer producer) {
this.producer = producer;
}
public synchronized MessageConsumer getConsumer(Destination destination) throws JMSException { public synchronized MessageConsumer getConsumer(Destination destination) throws JMSException {
return getConsumer(destination,true); return getConsumer(destination, true);
} }
public synchronized MessageConsumer getConsumer(Destination destination, boolean create) throws JMSException { public synchronized MessageConsumer getConsumer(Destination destination, boolean create) throws JMSException {
MessageConsumer consumer = (MessageConsumer) consumers.get(destination); MessageConsumer consumer = (MessageConsumer) consumers.get(destination);
if (create && consumer == null) { if (create && consumer == null) {
consumer = getSession().createConsumer(destination); consumer = getSession().createConsumer(destination);
@ -250,13 +254,12 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind
consumers.remove(destination); consumers.remove(destination);
consumer.setMessageListener(null); consumer.setMessageListener(null);
if (consumer instanceof MessageAvailableConsumer) if (consumer instanceof MessageAvailableConsumer)
((MessageAvailableConsumer)consumer).setAvailableListener(null); ((MessageAvailableConsumer) consumer).setAvailableListener(null);
consumer.close(); consumer.close();
} }
} }
public synchronized List getConsumers() public synchronized List getConsumers() {
{
return new ArrayList(consumers.values()); return new ArrayList(consumers.values());
} }
@ -264,7 +267,6 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind
return (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); return (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
} }
public Semaphore getSemaphore() { public Semaphore getSemaphore() {
return semaphore; return semaphore;
} }