Fixes empty message bodies from responses to statistics plugin queries
over the STOMP transport. 

This closes #41
This commit is contained in:
Timothy Bish 2014-08-07 14:18:40 -04:00
parent 960186af89
commit 533cedc4fc
6 changed files with 168 additions and 92 deletions

View File

@ -27,12 +27,13 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
/** /**
* Implementations of this interface are used to map back and forth from Stomp * Implementations of this interface are used to map back and forth from STOMP
* to ActiveMQ. There are several standard mappings which are semantically the * to ActiveMQ. There are several standard mappings which are semantically the
* same, the inner class, Helper, provides functions to copy those properties * same, the inner class, Helper, provides functions to copy those properties
* from one to the other * from one to the other
*/ */
public interface FrameTranslator { public interface FrameTranslator {
ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame frame) throws JMSException, ProtocolException; ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame frame) throws JMSException, ProtocolException;
StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException; StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException;
@ -142,7 +143,7 @@ public interface FrameTranslator {
msg.setPersistent("true".equals(o)); msg.setPersistent("true".equals(o));
} }
// Stomp specific headers // STOMP specific headers
headers.remove(Stomp.Headers.RECEIPT_REQUESTED); headers.remove(Stomp.Headers.RECEIPT_REQUESTED);
// Since we take the rest of the header and put them in properties which could then // Since we take the rest of the header and put them in properties which could then

View File

@ -16,6 +16,9 @@
*/ */
package org.apache.activemq.transport.stomp; package org.apache.activemq.transport.stomp;
import static org.apache.activemq.transport.stomp.FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage;
import static org.apache.activemq.transport.stomp.FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.io.StringReader; import java.io.StringReader;
@ -33,6 +36,9 @@ import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage; import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.DataStructure; import org.apache.activemq.command.DataStructure;
import org.apache.activemq.transport.stomp.Stomp.Headers;
import org.apache.activemq.transport.stomp.Stomp.Responses;
import org.apache.activemq.transport.stomp.Stomp.Transformations;
import org.codehaus.jettison.mapped.Configuration; import org.codehaus.jettison.mapped.Configuration;
import org.fusesource.hawtbuf.UTF8Buffer; import org.fusesource.hawtbuf.UTF8Buffer;
@ -49,29 +55,25 @@ import com.thoughtworks.xstream.io.xml.xppdom.XppFactory;
/** /**
* Frame translator implementation that uses XStream to convert messages to and * Frame translator implementation that uses XStream to convert messages to and
* from XML and JSON * from XML and JSON
*
* @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
*/ */
public class JmsFrameTranslator extends LegacyFrameTranslator implements public class JmsFrameTranslator extends LegacyFrameTranslator implements BrokerContextAware {
BrokerContextAware {
XStream xStream = null; XStream xStream = null;
BrokerContext brokerContext; BrokerContext brokerContext;
@Override @Override
public ActiveMQMessage convertFrame(ProtocolConverter converter, public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
StompFrame command) throws JMSException, ProtocolException {
Map<String, String> headers = command.getHeaders(); Map<String, String> headers = command.getHeaders();
ActiveMQMessage msg; ActiveMQMessage msg;
String transformation = headers.get(Stomp.Headers.TRANSFORMATION); String transformation = headers.get(Headers.TRANSFORMATION);
if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) { if (headers.containsKey(Headers.CONTENT_LENGTH) || transformation.equals(Transformations.JMS_BYTE.toString())) {
msg = super.convertFrame(converter, command); msg = super.convertFrame(converter, command);
} else { } else {
HierarchicalStreamReader in; HierarchicalStreamReader in;
try { try {
String text = new String(command.getContent(), "UTF-8"); String text = new String(command.getContent(), "UTF-8");
switch (Stomp.Transformations.getValue(transformation)) { switch (Transformations.getValue(transformation)) {
case JMS_OBJECT_XML: case JMS_OBJECT_XML:
in = new XppReader(new StringReader(text), XppFactory.createDefaultParser()); in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
msg = createObjectMessage(in); msg = createObjectMessage(in);
@ -92,90 +94,90 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
throw new Exception("Unkown transformation: " + transformation); throw new Exception("Unkown transformation: " + transformation);
} }
} catch (Throwable e) { } catch (Throwable e) {
command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage()); command.getHeaders().put(Headers.TRANSFORMATION_ERROR, e.getMessage());
msg = super.convertFrame(converter, command); msg = super.convertFrame(converter, command);
} }
} }
FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
return msg; return msg;
} }
@Override @Override
public StompFrame convertMessage(ProtocolConverter converter, public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException {
ActiveMQMessage message) throws IOException, JMSException {
if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
StompFrame command = new StompFrame(); StompFrame command = new StompFrame();
command.setAction(Stomp.Responses.MESSAGE); command.setAction(Responses.MESSAGE);
Map<String, String> headers = new HashMap<String, String>(25); Map<String, String> headers = new HashMap<String, String>(25);
command.setHeaders(headers); command.setHeaders(headers);
FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( copyStandardHeadersFromMessageToFrame(converter, message, command, this);
converter, message, command, this);
if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) { String transformation = headers.get(Headers.TRANSFORMATION);
headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString());
} else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) { if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString());
if (Transformations.JMS_XML.equals(transformation)) {
headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_XML.toString());
} else if (Transformations.JMS_JSON.equals(transformation)) {
headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_JSON.toString());
}
if (!headers.containsKey(Headers.TRANSFORMATION)) {
headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_XML.toString());
} }
ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy(); ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
command.setContent(marshall(msg.getObject(), command.setContent(marshall(msg.getObject(), headers.get(Headers.TRANSFORMATION)).getBytes("UTF-8"));
headers.get(Stomp.Headers.TRANSFORMATION))
.getBytes("UTF-8"));
return command;
} else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
StompFrame command = new StompFrame();
command.setAction(Stomp.Responses.MESSAGE);
Map<String, String> headers = new HashMap<String, String>(25);
command.setHeaders(headers);
FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( if (Transformations.JMS_XML.equals(transformation)) {
converter, message, command, this); headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_XML.toString());
} else if (Transformations.JMS_JSON.equals(transformation)) {
headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_JSON.toString());
}
if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) { if (!headers.containsKey(Headers.TRANSFORMATION)) {
headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString()); headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_XML.toString());
} else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString());
} }
ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy(); ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
command.setContent(marshall((Serializable)msg.getContentMap(), command.setContent(marshall((Serializable) msg.getContentMap(), headers.get(Headers.TRANSFORMATION)).getBytes("UTF-8"));
headers.get(Stomp.Headers.TRANSFORMATION)).getBytes("UTF-8"));
return command;
} else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
StompFrame command = new StompFrame(); } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE && AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
command.setAction(Stomp.Responses.MESSAGE);
Map<String, String> headers = new HashMap<String, String>(25);
command.setHeaders(headers);
FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( if (Transformations.JMS_XML.equals(transformation)) {
converter, message, command, this); headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_XML.toString());
} else if (Transformations.JMS_JSON.equals(transformation)) {
if (!headers.containsKey(Stomp.Headers.TRANSFORMATION)) { headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_JSON.toString());
headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString());
} }
if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) { if (!headers.containsKey(Headers.TRANSFORMATION)) {
headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString()); headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_JSON.toString());
} else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString());
} }
String body = marshallAdvisory(message.getDataStructure(), String body = marshallAdvisory(message.getDataStructure(), headers.get(Headers.TRANSFORMATION));
headers.get(Stomp.Headers.TRANSFORMATION));
command.setContent(body.getBytes("UTF-8")); command.setContent(body.getBytes("UTF-8"));
return command;
} else { } else {
return super.convertMessage(converter, message); command = super.convertMessage(converter, message);
} }
return command;
} }
/** /**
* Marshalls the Object to a string using XML or JSON encoding * Marshal the Object to a string using XML or JSON encoding
*
* @param object
* the object to marshal
* @param transformation
* the transformation to apply to the object.
*
* @returns the marshaled form of the given object, in JSON or XML.
*
* @throws JMSException if an error occurs during the marshal operation.
*/ */
protected String marshall(Serializable object, String transformation) throws JMSException { protected String marshall(Serializable object, String transformation) throws JMSException {
StringWriter buffer = new StringWriter(); StringWriter buffer = new StringWriter();
@ -199,7 +201,7 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException { protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
ActiveMQMapMessage mapMsg = new ActiveMQMapMessage(); ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in); Map<String, Object> map = (Map<String, Object>) getXStream().unmarshal(in);
for (String key : map.keySet()) { for (String key : map.keySet()) {
mapMsg.setObject(key, map.get(key)); mapMsg.setObject(key, map.get(key));
} }
@ -256,8 +258,9 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
xstream.ignoreUnknownElements(); xstream.ignoreUnknownElements();
} }
// For any object whose elements contains an UTF8Buffer instance instead of a String // For any object whose elements contains an UTF8Buffer instance instead
// type we map it to String both in and out such that we don't marshal UTF8Buffers out // of a String type we map it to String both in and out such that we don't
// marshal UTF8Buffers out
xstream.registerConverter(new AbstractSingleValueConverter() { xstream.registerConverter(new AbstractSingleValueConverter() {
@Override @Override
@ -289,8 +292,11 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
/** /**
* Return an Advisory message as a JSON formatted string * Return an Advisory message as a JSON formatted string
*
* @param ds * @param ds
* @return * the DataStructure instance that is being marshaled.
*
* @return the JSON marshaled form of the given DataStructure instance.
*/ */
protected String marshallAdvisory(final DataStructure ds) { protected String marshallAdvisory(final DataStructure ds) {
XStream xstream = new XStream(new JsonHierarchicalStreamDriver()); XStream xstream = new XStream(new JsonHierarchicalStreamDriver());

View File

@ -204,17 +204,16 @@ public class ProtocolConverter {
} }
protected FrameTranslator findTranslator(String header) { protected FrameTranslator findTranslator(String header) {
return findTranslator(header, null); return findTranslator(header, null, false);
} }
protected FrameTranslator findTranslator(String header, ActiveMQDestination destination) { protected FrameTranslator findTranslator(String header, ActiveMQDestination destination, boolean advisory) {
FrameTranslator translator = frameTranslator; FrameTranslator translator = frameTranslator;
try { try {
if (header != null) { if (header != null) {
translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER.newInstance(header);
.newInstance(header);
} else { } else {
if (destination != null && AdvisorySupport.isAdvisoryTopic(destination)) { if (destination != null && (advisory || AdvisorySupport.isAdvisoryTopic(destination))) {
translator = new JmsFrameTranslator(); translator = new JmsFrameTranslator();
} }
} }
@ -230,7 +229,7 @@ public class ProtocolConverter {
} }
/** /**
* Convert a stomp command * Convert a STOMP command
* *
* @param command * @param command
*/ */
@ -894,7 +893,9 @@ public class ProtocolConverter {
if (ignoreTransformation == true) { if (ignoreTransformation == true) {
return frameTranslator.convertMessage(this, message); return frameTranslator.convertMessage(this, message);
} else { } else {
return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION), message.getDestination()).convertMessage(this, message); FrameTranslator translator = findTranslator(
message.getStringProperty(Stomp.Headers.TRANSFORMATION), message.getDestination(), message.isAdvisory());
return translator.convertMessage(this, message);
} }
} }

View File

@ -176,10 +176,15 @@ public interface Stomp {
JMS_ADVISORY_XML, JMS_ADVISORY_XML,
JMS_ADVISORY_JSON; JMS_ADVISORY_JSON;
@Override
public String toString() { public String toString() {
return name().replaceAll("_", "-").toLowerCase(Locale.ENGLISH); return name().replaceAll("_", "-").toLowerCase(Locale.ENGLISH);
} }
public boolean equals(String value) {
return toString().equals(value);
}
public static Transformations getValue(String value) { public static Transformations getValue(String value) {
return valueOf(value.replaceAll("-", "_").toUpperCase(Locale.ENGLISH)); return valueOf(value.replaceAll("-", "_").toUpperCase(Locale.ENGLISH));
} }

View File

@ -22,28 +22,40 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.plugin.StatisticsBrokerPlugin;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class StompAdvisoryTest extends StompTestSupport { public class StompAdvisoryTest extends StompTestSupport {
static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination";
private static final Logger LOG = LoggerFactory.getLogger(StompAdvisoryTest.class); private static final Logger LOG = LoggerFactory.getLogger(StompAdvisoryTest.class);
protected ActiveMQConnection connection; protected ActiveMQConnection connection;
@Override
protected void addAdditionalPlugins(List<BrokerPlugin> plugins) throws Exception {
plugins.add(new StatisticsBrokerPlugin());
}
@Override @Override
protected void applyBrokerPolicies() throws Exception { protected void applyBrokerPolicies() throws Exception {
@ -269,4 +281,50 @@ public class StompAdvisoryTest extends StompTestSupport {
c.stop(); c.stop();
c.close(); c.close();
} }
@Test
public void testStatisticsAdvisory() throws Exception {
Connection c = cf.createConnection("system", "manager");
c.start();
final Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Topic replyTo = session.createTopic("stats");
// Dummy Queue used to later gather statistics.
final ActiveMQQueue testQueue = new ActiveMQQueue("queueToBeTestedForStats");
final MessageProducer producer = session.createProducer(null);
Message mess = session.createTextMessage("test");
producer.send(testQueue, mess);
// Create a request for Queue statistics
Thread child = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
Queue query = session.createQueue(STATS_DESTINATION_PREFIX + testQueue.getQueueName());
Message msg = session.createMessage();
msg.setJMSReplyTo(replyTo);
producer.send(query, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
});
child.start();
// Attempt to gather the statistics response from the previous request.
stompConnection.connect("system", "manager");
stompConnection.subscribe("/topic/" + replyTo.getTopicName(), Stomp.Headers.Subscribe.AckModeValues.AUTO);
stompConnection.begin("TX");
StompFrame f = stompConnection.receive(5000);
stompConnection.commit("TX");
LOG.debug(f.toString());
assertEquals(f.getAction(),"MESSAGE");
assertTrue("Should have a body", f.getBody().length() > 0);
assertTrue("Should contains memoryUsage stats", f.getBody().contains("memoryUsage"));
c.stop();
c.close();
}
} }

View File

@ -146,6 +146,8 @@ public class StompTestSupport {
plugins.add(configureAuthentication()); plugins.add(configureAuthentication());
} }
addAdditionalPlugins(plugins);
if (!plugins.isEmpty()) { if (!plugins.isEmpty()) {
BrokerPlugin[] array = new BrokerPlugin[plugins.size()]; BrokerPlugin[] array = new BrokerPlugin[plugins.size()];
brokerService.setPlugins(plugins.toArray(array)); brokerService.setPlugins(plugins.toArray(array));
@ -172,6 +174,9 @@ public class StompTestSupport {
brokerService.setJobSchedulerStore(jobStore); brokerService.setJobSchedulerStore(jobStore);
} }
protected void addAdditionalPlugins(List<BrokerPlugin> plugins) throws Exception {
}
protected BrokerPlugin configureAuthentication() throws Exception { protected BrokerPlugin configureAuthentication() throws Exception {
List<AuthenticationUser> users = new ArrayList<AuthenticationUser>(); List<AuthenticationUser> users = new ArrayList<AuthenticationUser>();
users.add(new AuthenticationUser("system", "manager", "users,admins")); users.add(new AuthenticationUser("system", "manager", "users,admins"));