Apply patch to work around known race condition. 

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1494681 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-06-19 16:31:36 +00:00
parent 5415f3a6aa
commit 166ab43c90
2 changed files with 28 additions and 30 deletions

View File

@ -16,30 +16,27 @@
*/
package org.apache.activemq.web;
import java.util.LinkedList;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import org.apache.activemq.MessageAvailableListener;
import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.activemq.MessageAvailableListener;
import java.util.LinkedList;
/*
* Listen for available messages and wakeup any continuations.
*/
public class AjaxListener implements MessageAvailableListener {
private static final Logger LOG = LoggerFactory.getLogger(AjaxListener.class);
private long maximumReadTimeout;
private AjaxWebClient client;
private final long maximumReadTimeout;
private final AjaxWebClient client;
private long lastAccess;
private Continuation continuation;
private LinkedList<UndeliveredAjaxMessage> undeliveredMessages = new LinkedList<UndeliveredAjaxMessage>();
private final LinkedList<UndeliveredAjaxMessage> undeliveredMessages = new LinkedList<UndeliveredAjaxMessage>();
AjaxListener(AjaxWebClient client, long maximumReadTimeout) {
this.client = client;
@ -58,7 +55,8 @@ public class AjaxListener implements MessageAvailableListener {
public LinkedList<UndeliveredAjaxMessage> getUndeliveredMessages() {
return undeliveredMessages;
}
@Override
public synchronized void onMessageAvailable(MessageConsumer consumer) {
if (LOG.isDebugEnabled()) {
LOG.debug("message for " + consumer + " continuation=" + continuation);
@ -66,23 +64,24 @@ public class AjaxListener implements MessageAvailableListener {
if (continuation != null) {
try {
Message message = consumer.receive(10);
LOG.debug( "message is " + message );
if( message != null ) {
if( continuation.isSuspended() ) {
LOG.debug( "Resuming suspended continuation " + continuation );
continuation.setAttribute("undelivered_message", new UndeliveredAjaxMessage( message, consumer ) );
LOG.debug("message is " + message);
if (message != null) {
if (!continuation.isResumed()) {
LOG.debug("Resuming suspended continuation " + continuation);
continuation.setAttribute("undelivered_message", new UndeliveredAjaxMessage(message, consumer));
continuation.resume();
} else {
LOG.debug( "Message available, but continuation is already resumed. Buffer for next time." );
bufferMessageForDelivery( message, consumer );
LOG.debug("Message available, but continuation is already resumed. Buffer for next time.");
bufferMessageForDelivery(message, consumer);
}
}
} catch (Exception e) {
LOG.error("Error receiving message " + e, e);
}
} else if (System.currentTimeMillis() - lastAccess > 2 * this.maximumReadTimeout) {
new Thread() {
@Override
public void run() {
client.closeConsumers();
};
@ -90,17 +89,17 @@ public class AjaxListener implements MessageAvailableListener {
} else {
try {
Message message = consumer.receive(10);
bufferMessageForDelivery( message, consumer );
bufferMessageForDelivery(message, consumer);
} catch (Exception e) {
LOG.error("Error receiving message " + e, e);
}
}
}
public void bufferMessageForDelivery( Message message, MessageConsumer consumer ) {
if( message != null ) {
synchronized( undeliveredMessages ) {
undeliveredMessages.addLast( new UndeliveredAjaxMessage( message, consumer ) );
public void bufferMessageForDelivery(Message message, MessageConsumer consumer) {
if (message != null) {
synchronized (undeliveredMessages) {
undeliveredMessages.addLast(new UndeliveredAjaxMessage(message, consumer));
}
}
}

View File

@ -313,20 +313,20 @@ public class MessageListenerServlet extends MessageServletSupport {
Continuation continuation = ContinuationSupport.getContinuation(request);
// Add a listener to the continuation to make sure it actually
// will expire (seems like a bug in Jetty Servlet 3 continuations,
// will expire (seems like a bug in Jetty Servlet 3 continuations,
// see https://issues.apache.org/jira/browse/AMQ-3447
continuation.addContinuationListener(new ContinuationListener() {
@Override
public void onTimeout(Continuation cont) {
if (LOG.isDebugEnabled()) {
LOG.debug("Continuation " + cont.toString() + " expired.");
LOG.debug("Continuation " + cont.toString() + " expired.");
}
}
@Override
public void onComplete(Continuation cont) {
if (LOG.isDebugEnabled()) {
LOG.debug("Continuation " + cont.toString() + " completed.");
LOG.debug("Continuation " + cont.toString() + " completed.");
}
}
});
@ -381,7 +381,7 @@ public class MessageListenerServlet extends MessageServletSupport {
LinkedList<UndeliveredAjaxMessage> undeliveredMessages = ((AjaxListener)consumer.getAvailableListener()).getUndeliveredMessages();
LOG.debug("Send " + undeliveredMessages.size() + " unconsumed messages");
synchronized( undeliveredMessages ) {
for (Iterator<UndeliveredAjaxMessage> it = undeliveredMessages.iterator(); it.hasNext(); ) {
for (Iterator<UndeliveredAjaxMessage> it = undeliveredMessages.iterator(); it.hasNext();) {
messages++;
UndeliveredAjaxMessage undelivered = it.next();
Message msg = undelivered.getMessage();
@ -425,7 +425,6 @@ public class MessageListenerServlet extends MessageServletSupport {
String m = swriter.toString();
response.getWriter().println(m);
}
}
protected void writeMessageResponse(PrintWriter writer, Message message, String id, String destinationName) throws JMSException, IOException {