git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1177026 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-09-28 19:36:02 +00:00
parent 02f63c9782
commit d9124246b7
1 changed files with 39 additions and 35 deletions

View File

@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web;
import java.io.IOException;
@ -35,7 +34,6 @@ import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import org.apache.activemq.MessageAvailableConsumer;
import org.apache.activemq.MessageAvailableListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.eclipse.jetty.continuation.Continuation;
@ -59,19 +57,20 @@ import org.eclipse.jetty.continuation.ContinuationSupport;
* <dt></dt>
* <dd></dd>
* </dl>
*
*
*
*
*/
@SuppressWarnings("serial")
public class MessageListenerServlet extends MessageServletSupport {
private static final Logger LOG = LoggerFactory.getLogger(MessageListenerServlet.class);
private String readTimeoutParameter = "timeout";
private long defaultReadTimeout = -1;
private long maximumReadTimeout = 25000;
private int maximumMessages = 100;
private Timer clientCleanupTimer = new Timer();
private HashMap<String,AjaxWebClient> ajaxWebClients = new HashMap<String,AjaxWebClient>();
public void init() throws ServletException {
ServletConfig servletConfig = getServletConfig();
String name = servletConfig.getInitParameter("defaultReadTimeout");
@ -88,7 +87,7 @@ public class MessageListenerServlet extends MessageServletSupport {
}
clientCleanupTimer.schedule( new ClientCleaner(), 5000, 60000 );
}
/**
* Sends a message to a destination or manage subscriptions. If the the
* content type of the POST is
@ -102,7 +101,7 @@ public class MessageListenerServlet extends MessageServletSupport {
* the content type is not <code>application/x-www-form-urlencoded</code>,
* then the body of the post is sent as the message to a destination that is
* derived from a query parameter, the URL or the default destination.
*
*
* @param request
* @param response
* @throws ServletException
@ -166,7 +165,7 @@ public class MessageListenerServlet extends MessageServletSupport {
}
} else if ("unlisten".equals(type)) {
Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
Map consumerDestinationNameMap = client.getDestinationNameMap();
Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
consumer.setAvailableListener(null);
@ -246,7 +245,7 @@ public class MessageListenerServlet extends MessageServletSupport {
/**
* Reads a message from a destination up to some specific timeout period
*
*
* @param client The webclient
* @param request
* @param response
@ -262,7 +261,7 @@ public class MessageListenerServlet extends MessageServletSupport {
if (LOG.isDebugEnabled()) {
LOG.debug("doMessage timeout=" + timeout);
}
// this is non-null if we're resuming the continuation.
// attributes set in AjaxListener
UndeliveredAjaxMessage undelivered_message = null;
@ -271,10 +270,10 @@ public class MessageListenerServlet extends MessageServletSupport {
if( undelivered_message != null ) {
message = (Message)undelivered_message.getMessage();
}
synchronized (client) {
List consumers = client.getConsumers();
List<MessageConsumer> consumers = client.getConsumers();
MessageAvailableConsumer consumer = null;
if( undelivered_message != null ) {
consumer = (MessageAvailableConsumer)undelivered_message.getConsumer();
@ -287,7 +286,7 @@ public class MessageListenerServlet extends MessageServletSupport {
if (consumer.getAvailableListener() == null) {
continue;
}
// Look for any available messages
message = consumer.receive(10);
if (LOG.isDebugEnabled()) {
@ -295,14 +294,14 @@ public class MessageListenerServlet extends MessageServletSupport {
}
}
}
// prepare the response
response.setContentType("text/xml");
response.setHeader("Cache-Control", "no-cache");
if (message == null && client.getListener().getUndeliveredMessages().size() == 0) {
Continuation continuation = ContinuationSupport.getContinuation(request);
if (continuation.isExpired()) {
response.setStatus(HttpServletResponse.SC_OK);
StringWriter swriter = new StringWriter();
@ -312,43 +311,43 @@ public class MessageListenerServlet extends MessageServletSupport {
writer.flush();
String m = swriter.toString();
response.getWriter().println(m);
response.getWriter().println(m);
return;
}
continuation.setTimeout(timeout);
continuation.suspend();
LOG.debug( "Suspending continuation " + continuation );
// Fetch the listeners
AjaxListener listener = client.getListener();
listener.access();
// register this continuation with our listener.
listener.setContinuation(continuation);
return;
}
StringWriter swriter = new StringWriter();
PrintWriter writer = new PrintWriter(swriter);
Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
response.setStatus(HttpServletResponse.SC_OK);
writer.println("<ajax-response>");
// Send any message we already have
if (message != null) {
String id = consumerIdMap.get(consumer);
String destinationName = consumerDestinationNameMap.get(consumer);
LOG.debug( "sending pre-existing message" );
writeMessageResponse(writer, message, id, destinationName);
messages++;
}
// send messages buffered while continuation was unavailable.
LinkedList<UndeliveredAjaxMessage> undeliveredMessages = ((AjaxListener)consumer.getAvailableListener()).getUndeliveredMessages();
LOG.debug("Send " + undeliveredMessages.size() + " unconsumed messages");
@ -369,7 +368,7 @@ public class MessageListenerServlet extends MessageServletSupport {
}
}
}
// Send the rest of the messages
for (int i = 0; i < consumers.size() && messages < maximumMessages; i++) {
consumer = (MessageAvailableConsumer)consumers.get(i);
@ -422,22 +421,21 @@ public class MessageListenerServlet extends MessageServletSupport {
}
writer.println("</response>");
}
/*
* Return the AjaxWebClient for this session+clientId.
* Create one if it does not already exist.
*/
protected AjaxWebClient getAjaxWebClient( HttpServletRequest request ) {
long now = (new Date()).getTime();
HttpSession session = request.getSession(true);
String clientId = request.getParameter( "clientId" );
String clientId = request.getParameter( "clientId" );
// if user doesn't supply a 'clientId', we'll just use a default.
if( clientId == null ) {
clientId = "defaultAjaxWebClient";
}
String sessionKey = session.getId() + '-' + clientId;
AjaxWebClient client = ajaxWebClients.get( sessionKey );
synchronized (ajaxWebClients) {
// create a new AjaxWebClient if one does not already exist for this sessionKey.
@ -469,7 +467,7 @@ public class MessageListenerServlet extends MessageServletSupport {
}
return answer;
}
/*
* an instance of this class runs every minute (started in init), to clean up old web clients & free resources.
*/
@ -478,11 +476,11 @@ public class MessageListenerServlet extends MessageServletSupport {
if( LOG.isDebugEnabled() ) {
LOG.debug( "Cleaning up expired web clients." );
}
synchronized( ajaxWebClients ) {
Iterator it = ajaxWebClients.entrySet().iterator();
Iterator<Map.Entry<String, AjaxWebClient>> it = ajaxWebClients.entrySet().iterator();
while ( it.hasNext() ) {
Map.Entry<String,AjaxWebClient> e = (Map.Entry<String,AjaxWebClient>)it.next();
Map.Entry<String,AjaxWebClient> e = it.next();
String key = e.getKey();
AjaxWebClient val = e.getValue();
if ( LOG.isDebugEnabled() ) {
@ -499,4 +497,10 @@ public class MessageListenerServlet extends MessageServletSupport {
}
}
}
public void destroy() {
// make sure we cancel the timer
clientCleanupTimer.cancel();
super.destroy();
}
}