Issues arises when the StompTransport is sending an Ack back for Auto Ack 
mode in the same thread as the message was dispatched in.  If an incoming 
command beats the auto ack to onCommand in the activity monitor it will block
as the original thread is now waiting on the incoming command for the transport
lock.  Need to send back the Auto Acks in their own thread using a Task Runner
in the Stomp Transport so that the dispatch thread can complete and release
its lock on the transport.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1180070 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-10-07 15:22:31 +00:00
parent 216f74e003
commit 8cf318788d
5 changed files with 78 additions and 9 deletions

View File

@ -40,6 +40,7 @@ import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
import com.thoughtworks.xstream.io.xml.XppReader;
import com.thoughtworks.xstream.io.xml.xppdom.XppFactory;
/**
* Frame translator implementation that uses XStream to convert messages to and
@ -67,7 +68,7 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
String text = new String(command.getContent(), "UTF-8");
switch (Stomp.Transformations.getValue(transformation)) {
case JMS_OBJECT_XML:
in = new XppReader(new StringReader(text));
in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
msg = createObjectMessage(in);
break;
case JMS_OBJECT_JSON:
@ -75,7 +76,7 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
msg = createObjectMessage(in);
break;
case JMS_MAP_XML:
in = new XppReader(new StringReader(text));
in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
msg = createMapMessage(in);
break;
case JMS_MAP_JSON:

View File

@ -75,7 +75,7 @@ public class StompSubscription {
}
} else if (ackMode == AUTO_ACK) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
protocolConverter.getStompTransport().sendToActiveMQ(ack);
protocolConverter.getStompTransport().asyncSendToActiveMQ(ack);
}
boolean ignoreTransformation = false;

View File

@ -29,6 +29,8 @@ public interface StompTransport {
public void sendToActiveMQ(Command command);
public void asyncSendToActiveMQ(Command command);
public void sendToStomp(StompFrame command) throws IOException;
public X509Certificate[] getPeerCertificates();

View File

@ -18,11 +18,15 @@ package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.jms.JMSException;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.Command;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
@ -46,21 +50,61 @@ public class StompTransportFilter extends TransportFilter implements StompTransp
private final ProtocolConverter protocolConverter;
private StompInactivityMonitor monitor;
private StompWireFormat wireFormat;
private final TaskRunner asyncSendTask;
private final ConcurrentLinkedQueue<Command> asyncCommands = new ConcurrentLinkedQueue<Command>();
private boolean trace;
private int maxAsyncBatchSize = 25;
public StompTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
super(next);
this.protocolConverter = new ProtocolConverter(this, brokerContext);
asyncSendTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
public boolean iterate() {
int iterations = 0;
TransportListener listener = transportListener;
if (listener != null) {
while (iterations++ < maxAsyncBatchSize && !asyncCommands.isEmpty()) {
Command command = asyncCommands.poll();
if (command != null) {
listener.onCommand(command);
}
}
}
return !asyncCommands.isEmpty();
}
}, "ActiveMQ StompTransport Async Worker: " + System.identityHashCode(this));
if (wireFormat instanceof StompWireFormat) {
this.wireFormat = (StompWireFormat) wireFormat;
}
}
public void stop() throws Exception {
asyncSendTask.shutdown();
TransportListener listener = transportListener;
if (listener != null) {
Command commands[] = new Command[0];
asyncCommands.toArray(commands);
asyncCommands.clear();
for(Command command : commands) {
try {
listener.onCommand(command);
} catch(Exception e) {
break;
}
}
}
super.stop();
}
public void oneway(Object o) throws IOException {
try {
final Command command = (Command)o;
final Command command = (Command) o;
protocolConverter.onActiveMQCommand(command);
} catch (JMSException e) {
throw IOExceptionSupport.create(e);
@ -73,7 +117,7 @@ public class StompTransportFilter extends TransportFilter implements StompTransp
TRACE.trace("Received: \n" + command);
}
protocolConverter.onStompCommand((StompFrame)command);
protocolConverter.onStompCommand((StompFrame) command);
} catch (IOException e) {
onException(e);
} catch (JMSException e) {
@ -83,24 +127,33 @@ public class StompTransportFilter extends TransportFilter implements StompTransp
public void sendToActiveMQ(Command command) {
TransportListener l = transportListener;
if (l!=null) {
if (l != null) {
l.onCommand(command);
}
}
public void asyncSendToActiveMQ(Command command) {
asyncCommands.offer(command);
try {
asyncSendTask.wakeup();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void sendToStomp(StompFrame command) throws IOException {
if (trace) {
TRACE.trace("Sending: \n" + command);
}
Transport n = next;
if (n!=null) {
if (n != null) {
n.oneway(command);
}
}
public X509Certificate[] getPeerCertificates() {
if(next instanceof SslTransport) {
X509Certificate[] peerCerts = ((SslTransport)next).getPeerCertificates();
if (next instanceof SslTransport) {
X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates();
if (trace && peerCerts != null) {
LOG.debug("Peer Identity has been verified\n");
}
@ -130,4 +183,12 @@ public class StompTransportFilter extends TransportFilter implements StompTransp
public StompWireFormat getWireFormat() {
return this.wireFormat;
}
public int getMaxAsyncBatchSize() {
return maxAsyncBatchSize;
}
public void setMaxAsyncBatchSize(int maxAsyncBatchSize) {
this.maxAsyncBatchSize = maxAsyncBatchSize;
}
}

View File

@ -101,4 +101,9 @@ class StompSocket extends TransportSupport implements WebSocket, StompTransport
public StompWireFormat getWireFormat() {
return this.wireFormat;
}
@Override
public void asyncSendToActiveMQ(Command command) {
doConsume(command);
}
}