cumulative commit for issues AMQ-1955, AMQ-1453 and AMQ-1960

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@703161 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2008-10-09 13:17:17 +00:00
parent 9fbba8d1f8
commit 2b2b3e8e07
5 changed files with 124 additions and 11 deletions

View File

@ -52,6 +52,14 @@
<servlet-name>MessageServlet</servlet-name>
<servlet-class>org.apache.activemq.web.MessageServlet</servlet-class>
<load-on-startup>1</load-on-startup>
<!--
Uncomment this parameter if you plan to use multiple consumers over REST
<init-param>
<param-name>destinationOptions</param-name>
<param-value>consumer.prefetchSize=1</param-value>
</init-param>
-->
</servlet>
<!-- the queue browse servlet -->

View File

@ -36,6 +36,10 @@
<groupId>${pom.groupId}</groupId>
<artifactId>activemq-core</artifactId>
</dependency>
<dependency>
<groupId>${pom.groupId}</groupId>
<artifactId>activemq-camel</artifactId>
</dependency>
<dependency>
<groupId>${pom.groupId}</groupId>
<artifactId>activemq-core</artifactId>

View File

@ -19,6 +19,7 @@ package org.apache.activemq.web;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@ -29,12 +30,20 @@ import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
import javax.servlet.ServletConfig;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.activemq.MessageAvailableConsumer;
import org.apache.activemq.MessageAvailableListener;
import org.apache.activemq.camel.converter.ActiveMQMessageConverter;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Producer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mortbay.util.ajax.Continuation;
@ -56,6 +65,9 @@ public class MessageServlet extends MessageServletSupport {
private String readTimeoutParameter = "readTimeout";
private long defaultReadTimeout = -1;
private long maximumReadTimeout = 20000;
private long requestTimeout = 1000;
private HashMap<String, WebClient> clients = new HashMap<String, WebClient>();
public void init() throws ServletException {
ServletConfig servletConfig = getServletConfig();
@ -67,6 +79,10 @@ public class MessageServlet extends MessageServletSupport {
if (name != null) {
maximumReadTimeout = asLong(name);
}
name = servletConfig.getInitParameter("replyTimeout");
if (name != null) {
requestTimeout = asLong(name);
}
}
/**
@ -80,7 +96,7 @@ public class MessageServlet extends MessageServletSupport {
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
// lets turn the HTTP post into a JMS Message
try {
WebClient client = WebClient.getWebClient(request);
WebClient client = getWebClient(request);
String text = getPostedMessageBody(request);
@ -94,12 +110,28 @@ public class MessageServlet extends MessageServletSupport {
LOG.debug("Sending message to: " + destination + " with text: " + text);
}
boolean sync = isSync(request);
TextMessage message = client.getSession().createTextMessage(text);
appendParametersToMessage(request, message);
boolean persistent = isSendPersistent(request);
int priority = getSendPriority(request);
long timeToLive = getSendTimeToLive(request);
client.send(destination, message, persistent, priority, timeToLive);
if (sync) {
String point = "activemq:"
+ ((ActiveMQDestination)destination).getPhysicalName().replace("//", "")
+ "?requestTimeout=" + requestTimeout;
try {
String body = (String)client.getProducerTemplate().requestBody(point, text);
ActiveMQTextMessage answer = new ActiveMQTextMessage();
answer.setText(body);
writeMessageResponse(response.getWriter(), answer);
} catch (Exception e) {
throw new IOException(e);
}
} else {
appendParametersToMessage(request, message);
boolean persistent = isSendPersistent(request);
int priority = getSendPriority(request);
long timeToLive = getSendTimeToLive(request);
client.send(destination, message, persistent, priority, timeToLive);
}
// lets return a unique URI for reliable messaging
response.setHeader("messageID", message.getJMSMessageID());
@ -137,7 +169,7 @@ public class MessageServlet extends MessageServletSupport {
int messages = 0;
try {
WebClient client = WebClient.getWebClient(request);
WebClient client = getWebClient(request);
Destination destination = getDestination(client, request);
if (destination == null) {
throw new NoDestinationSuppliedException();
@ -224,8 +256,10 @@ public class MessageServlet extends MessageServletSupport {
}
// look for next message
message = consumer.receiveNoWait();
messages++;
if(maxMessages < 0 || messages < maxMessages) {
message = consumer.receiveNoWait();
}
}
}
@ -255,7 +289,7 @@ public class MessageServlet extends MessageServletSupport {
int messages = 0;
try {
WebClient client = WebClient.getWebClient(request);
WebClient client = getWebClient(request);
Destination destination = getDestination(client, request);
long timeout = getReadTimeout(request);
boolean ajax = isRicoAjax(request);
@ -317,8 +351,11 @@ public class MessageServlet extends MessageServletSupport {
}
// look for next message
message = consumer.receiveNoWait();
messages++;
if(maxMessages < 0 || messages < maxMessages) {
message = consumer.receiveNoWait();
}
}
}
} finally {
@ -362,6 +399,25 @@ public class MessageServlet extends MessageServletSupport {
String rico = request.getParameter("rico");
return rico != null && rico.equals("true");
}
public WebClient getWebClient(HttpServletRequest request) {
String clientId = request.getParameter("clientId");
if (clientId != null) {
synchronized(this) {
LOG.debug("Getting local client [" + clientId + "]");
WebClient client = clients.get(clientId);
if (client == null) {
LOG.debug("Creating new client [" + clientId + "]");
client = new WebClient();
clients.put(clientId, client);
}
return client;
}
} else {
return WebClient.getWebClient(request);
}
}
protected String getContentType(HttpServletRequest request) {
/*

View File

@ -179,6 +179,14 @@ public abstract class MessageServletSupport extends HttpServlet {
return defaultMessagePersistent;
}
protected boolean isSync(HttpServletRequest request) {
String text = request.getParameter("sync");
if (text != null) {
return true;
}
return false;
}
protected Destination asDestination(Object value) {
if (value instanceof Destination) {
return (Destination)value;

View File

@ -47,9 +47,17 @@ import javax.servlet.http.HttpSessionEvent;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.MessageAvailableConsumer;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.activemq.camel.component.ActiveMQConfiguration;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import sun.util.logging.resources.logging;
/**
* Represents a messaging client used from inside a web container typically
* stored inside a HttpSession TODO controls to prevent DOS attacks with users
@ -77,6 +85,9 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind
private final Semaphore semaphore = new Semaphore(1);
private CamelContext camelContext;
private ProducerTemplate producerTemplate;
public WebClient() {
if (factory == null) {
throw new IllegalStateException("initContext(ServletContext) not called");
@ -111,6 +122,7 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind
public static void initContext(ServletContext context) {
initConnectionFactory(context);
context.setAttribute("webClients", new HashMap<String, WebClient>());
}
public int getDeliveryMode() {
@ -143,12 +155,16 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
if (producerTemplate != null) {
producerTemplate.stop();
}
} catch (Exception e) {
LOG.debug("caught exception closing consumer", e);
} finally {
producer = null;
session = null;
connection = null;
producerTemplate = null;
if (consumers != null) {
consumers.clear();
}
@ -256,6 +272,27 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind
servletContext.setAttribute(CONNECTION_FACTORY_ATTRIBUTE, factory);
}
}
public synchronized CamelContext getCamelContext() {
if (camelContext == null) {
LOG.debug("Creating camel context");
camelContext = new DefaultCamelContext();
ActiveMQConfiguration conf = new ActiveMQConfiguration();
conf.setConnectionFactory(new PooledConnectionFactory((ActiveMQConnectionFactory)factory));
ActiveMQComponent component = new ActiveMQComponent(conf);
camelContext.addComponent("activemq", component);
}
return camelContext;
}
public synchronized ProducerTemplate getProducerTemplate() throws Exception {
if (producerTemplate == null) {
LOG.debug("Creating producer template");
producerTemplate = getCamelContext().createProducerTemplate();
producerTemplate.start();
}
return producerTemplate;
}
public synchronized MessageProducer getProducer() throws JMSException {
if (producer == null) {