From 533cedc4fc555b4df573c8e17594941019f5e50b Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Thu, 7 Aug 2014 14:18:40 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5220 Fixes empty message bodies from responses to statistics plugin queries over the STOMP transport. This closes #41 --- .../transport/stomp/FrameTranslator.java | 5 +- .../transport/stomp/JmsFrameTranslator.java | 172 +++++++++--------- .../transport/stomp/ProtocolConverter.java | 15 +- .../activemq/transport/stomp/Stomp.java | 5 + .../transport/stomp/StompAdvisoryTest.java | 58 ++++++ .../transport/stomp/StompTestSupport.java | 5 + 6 files changed, 168 insertions(+), 92 deletions(-) diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java index d37d364545..7496472f82 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java @@ -27,12 +27,13 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; /** - * Implementations of this interface are used to map back and forth from Stomp + * Implementations of this interface are used to map back and forth from STOMP * to ActiveMQ. There are several standard mappings which are semantically the * same, the inner class, Helper, provides functions to copy those properties * from one to the other */ public interface FrameTranslator { + ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame frame) throws JMSException, ProtocolException; StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException; @@ -142,7 +143,7 @@ public interface FrameTranslator { msg.setPersistent("true".equals(o)); } - // Stomp specific headers + // STOMP specific headers headers.remove(Stomp.Headers.RECEIPT_REQUESTED); // Since we take the rest of the header and put them in properties which could then diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java index 6ae68fc978..3525b23305 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.transport.stomp; +import static org.apache.activemq.transport.stomp.FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage; +import static org.apache.activemq.transport.stomp.FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame; + import java.io.IOException; import java.io.Serializable; import java.io.StringReader; @@ -33,6 +36,9 @@ import org.apache.activemq.command.ActiveMQMapMessage; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQObjectMessage; import org.apache.activemq.command.DataStructure; +import org.apache.activemq.transport.stomp.Stomp.Headers; +import org.apache.activemq.transport.stomp.Stomp.Responses; +import org.apache.activemq.transport.stomp.Stomp.Transformations; import org.codehaus.jettison.mapped.Configuration; import org.fusesource.hawtbuf.UTF8Buffer; @@ -49,133 +55,129 @@ import com.thoughtworks.xstream.io.xml.xppdom.XppFactory; /** * Frame translator implementation that uses XStream to convert messages to and * from XML and JSON - * - * @author Dejan Bosanac */ -public class JmsFrameTranslator extends LegacyFrameTranslator implements - BrokerContextAware { +public class JmsFrameTranslator extends LegacyFrameTranslator implements BrokerContextAware { XStream xStream = null; BrokerContext brokerContext; @Override - public ActiveMQMessage convertFrame(ProtocolConverter converter, - StompFrame command) throws JMSException, ProtocolException { + public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException { Map headers = command.getHeaders(); ActiveMQMessage msg; - String transformation = headers.get(Stomp.Headers.TRANSFORMATION); - if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) { + String transformation = headers.get(Headers.TRANSFORMATION); + if (headers.containsKey(Headers.CONTENT_LENGTH) || transformation.equals(Transformations.JMS_BYTE.toString())) { msg = super.convertFrame(converter, command); } else { HierarchicalStreamReader in; try { String text = new String(command.getContent(), "UTF-8"); - switch (Stomp.Transformations.getValue(transformation)) { - case JMS_OBJECT_XML: - in = new XppReader(new StringReader(text), XppFactory.createDefaultParser()); - msg = createObjectMessage(in); - break; - case JMS_OBJECT_JSON: - in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); - msg = createObjectMessage(in); - break; - case JMS_MAP_XML: - in = new XppReader(new StringReader(text), XppFactory.createDefaultParser()); - msg = createMapMessage(in); - break; - case JMS_MAP_JSON: - in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); - msg = createMapMessage(in); - break; - default: - throw new Exception("Unkown transformation: " + transformation); + switch (Transformations.getValue(transformation)) { + case JMS_OBJECT_XML: + in = new XppReader(new StringReader(text), XppFactory.createDefaultParser()); + msg = createObjectMessage(in); + break; + case JMS_OBJECT_JSON: + in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); + msg = createObjectMessage(in); + break; + case JMS_MAP_XML: + in = new XppReader(new StringReader(text), XppFactory.createDefaultParser()); + msg = createMapMessage(in); + break; + case JMS_MAP_JSON: + in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); + msg = createMapMessage(in); + break; + default: + throw new Exception("Unkown transformation: " + transformation); } } catch (Throwable e) { - command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage()); + command.getHeaders().put(Headers.TRANSFORMATION_ERROR, e.getMessage()); msg = super.convertFrame(converter, command); } } - FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this); + + copyStandardHeadersFromFrameToMessage(converter, command, msg, this); return msg; } @Override - public StompFrame convertMessage(ProtocolConverter converter, - ActiveMQMessage message) throws IOException, JMSException { + public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException { + + StompFrame command = new StompFrame(); + command.setAction(Responses.MESSAGE); + Map headers = new HashMap(25); + command.setHeaders(headers); + + copyStandardHeadersFromMessageToFrame(converter, message, command, this); + + String transformation = headers.get(Headers.TRANSFORMATION); if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) { - StompFrame command = new StompFrame(); - command.setAction(Stomp.Responses.MESSAGE); - Map headers = new HashMap(25); - command.setHeaders(headers); - FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( - converter, message, command, this); + if (Transformations.JMS_XML.equals(transformation)) { + headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_XML.toString()); + } else if (Transformations.JMS_JSON.equals(transformation)) { + headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_JSON.toString()); + } - if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) { - headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString()); - } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) { - headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString()); + if (!headers.containsKey(Headers.TRANSFORMATION)) { + headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_XML.toString()); } ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy(); - command.setContent(marshall(msg.getObject(), - headers.get(Stomp.Headers.TRANSFORMATION)) - .getBytes("UTF-8")); - return command; + command.setContent(marshall(msg.getObject(), headers.get(Headers.TRANSFORMATION)).getBytes("UTF-8")); } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { - StompFrame command = new StompFrame(); - command.setAction(Stomp.Responses.MESSAGE); - Map headers = new HashMap(25); - command.setHeaders(headers); - FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( - converter, message, command, this); + if (Transformations.JMS_XML.equals(transformation)) { + headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_XML.toString()); + } else if (Transformations.JMS_JSON.equals(transformation)) { + headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_JSON.toString()); + } - if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) { - headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString()); - } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) { - headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString()); + if (!headers.containsKey(Headers.TRANSFORMATION)) { + headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_XML.toString()); } ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy(); - command.setContent(marshall((Serializable)msg.getContentMap(), - headers.get(Stomp.Headers.TRANSFORMATION)).getBytes("UTF-8")); - return command; - } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE && - AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) { + command.setContent(marshall((Serializable) msg.getContentMap(), headers.get(Headers.TRANSFORMATION)).getBytes("UTF-8")); - StompFrame command = new StompFrame(); - command.setAction(Stomp.Responses.MESSAGE); - Map headers = new HashMap(25); - command.setHeaders(headers); + } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE && AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) { - FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( - converter, message, command, this); - - if (!headers.containsKey(Stomp.Headers.TRANSFORMATION)) { - headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString()); + if (Transformations.JMS_XML.equals(transformation)) { + headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_XML.toString()); + } else if (Transformations.JMS_JSON.equals(transformation)) { + headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_JSON.toString()); } - if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) { - headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString()); - } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) { - headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString()); + if (!headers.containsKey(Headers.TRANSFORMATION)) { + headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_JSON.toString()); } - String body = marshallAdvisory(message.getDataStructure(), - headers.get(Stomp.Headers.TRANSFORMATION)); + String body = marshallAdvisory(message.getDataStructure(), headers.get(Headers.TRANSFORMATION)); command.setContent(body.getBytes("UTF-8")); - return command; + } else { - return super.convertMessage(converter, message); + command = super.convertMessage(converter, message); } + + return command; } /** - * Marshalls the Object to a string using XML or JSON encoding + * Marshal the Object to a string using XML or JSON encoding + * + * @param object + * the object to marshal + * @param transformation + * the transformation to apply to the object. + * + * @returns the marshaled form of the given object, in JSON or XML. + * + * @throws JMSException if an error occurs during the marshal operation. */ protected String marshall(Serializable object, String transformation) throws JMSException { StringWriter buffer = new StringWriter(); @@ -199,7 +201,7 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements @SuppressWarnings("unchecked") protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException { ActiveMQMapMessage mapMsg = new ActiveMQMapMessage(); - Map map = (Map)getXStream().unmarshal(in); + Map map = (Map) getXStream().unmarshal(in); for (String key : map.keySet()) { mapMsg.setObject(key, map.get(key)); } @@ -256,8 +258,9 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements xstream.ignoreUnknownElements(); } - // For any object whose elements contains an UTF8Buffer instance instead of a String - // type we map it to String both in and out such that we don't marshal UTF8Buffers out + // For any object whose elements contains an UTF8Buffer instance instead + // of a String type we map it to String both in and out such that we don't + // marshal UTF8Buffers out xstream.registerConverter(new AbstractSingleValueConverter() { @Override @@ -283,14 +286,17 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements } @Override - public BrokerContext getBrokerContext() { + public BrokerContext getBrokerContext() { return this.brokerContext; } /** * Return an Advisory message as a JSON formatted string + * * @param ds - * @return + * the DataStructure instance that is being marshaled. + * + * @return the JSON marshaled form of the given DataStructure instance. */ protected String marshallAdvisory(final DataStructure ds) { XStream xstream = new XStream(new JsonHierarchicalStreamDriver()); diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index 0ed08e4419..edefb15aec 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -204,17 +204,16 @@ public class ProtocolConverter { } protected FrameTranslator findTranslator(String header) { - return findTranslator(header, null); + return findTranslator(header, null, false); } - protected FrameTranslator findTranslator(String header, ActiveMQDestination destination) { + protected FrameTranslator findTranslator(String header, ActiveMQDestination destination, boolean advisory) { FrameTranslator translator = frameTranslator; try { if (header != null) { - translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER - .newInstance(header); + translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER.newInstance(header); } else { - if (destination != null && AdvisorySupport.isAdvisoryTopic(destination)) { + if (destination != null && (advisory || AdvisorySupport.isAdvisoryTopic(destination))) { translator = new JmsFrameTranslator(); } } @@ -230,7 +229,7 @@ public class ProtocolConverter { } /** - * Convert a stomp command + * Convert a STOMP command * * @param command */ @@ -894,7 +893,9 @@ public class ProtocolConverter { if (ignoreTransformation == true) { return frameTranslator.convertMessage(this, message); } else { - return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION), message.getDestination()).convertMessage(this, message); + FrameTranslator translator = findTranslator( + message.getStringProperty(Stomp.Headers.TRANSFORMATION), message.getDestination(), message.isAdvisory()); + return translator.convertMessage(this, message); } } diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java index a66b5eecdc..767e947b5c 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java @@ -176,10 +176,15 @@ public interface Stomp { JMS_ADVISORY_XML, JMS_ADVISORY_JSON; + @Override public String toString() { return name().replaceAll("_", "-").toLowerCase(Locale.ENGLISH); } + public boolean equals(String value) { + return toString().equals(value); + } + public static Transformations getValue(String value) { return valueOf(value.replaceAll("-", "_").toUpperCase(Locale.ENGLISH)); } diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java index 10d09b0033..cc78308497 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java @@ -22,28 +22,40 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.util.HashMap; +import java.util.List; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; +import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.plugin.StatisticsBrokerPlugin; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StompAdvisoryTest extends StompTestSupport { + static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination"; + private static final Logger LOG = LoggerFactory.getLogger(StompAdvisoryTest.class); protected ActiveMQConnection connection; + @Override + protected void addAdditionalPlugins(List plugins) throws Exception { + plugins.add(new StatisticsBrokerPlugin()); + } + @Override protected void applyBrokerPolicies() throws Exception { @@ -269,4 +281,50 @@ public class StompAdvisoryTest extends StompTestSupport { c.stop(); c.close(); } + + @Test + public void testStatisticsAdvisory() throws Exception { + Connection c = cf.createConnection("system", "manager"); + c.start(); + final Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Topic replyTo = session.createTopic("stats"); + + // Dummy Queue used to later gather statistics. + final ActiveMQQueue testQueue = new ActiveMQQueue("queueToBeTestedForStats"); + final MessageProducer producer = session.createProducer(null); + Message mess = session.createTextMessage("test"); + producer.send(testQueue, mess); + + // Create a request for Queue statistics + Thread child = new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(1000); + Queue query = session.createQueue(STATS_DESTINATION_PREFIX + testQueue.getQueueName()); + Message msg = session.createMessage(); + msg.setJMSReplyTo(replyTo); + producer.send(query, msg); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + child.start(); + + // Attempt to gather the statistics response from the previous request. + stompConnection.connect("system", "manager"); + stompConnection.subscribe("/topic/" + replyTo.getTopicName(), Stomp.Headers.Subscribe.AckModeValues.AUTO); + stompConnection.begin("TX"); + StompFrame f = stompConnection.receive(5000); + stompConnection.commit("TX"); + + LOG.debug(f.toString()); + assertEquals(f.getAction(),"MESSAGE"); + assertTrue("Should have a body", f.getBody().length() > 0); + assertTrue("Should contains memoryUsage stats", f.getBody().contains("memoryUsage")); + + c.stop(); + c.close(); + } } diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java index 3cf1356d95..e763552028 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java @@ -146,6 +146,8 @@ public class StompTestSupport { plugins.add(configureAuthentication()); } + addAdditionalPlugins(plugins); + if (!plugins.isEmpty()) { BrokerPlugin[] array = new BrokerPlugin[plugins.size()]; brokerService.setPlugins(plugins.toArray(array)); @@ -172,6 +174,9 @@ public class StompTestSupport { brokerService.setJobSchedulerStore(jobStore); } + protected void addAdditionalPlugins(List plugins) throws Exception { + } + protected BrokerPlugin configureAuthentication() throws Exception { List users = new ArrayList(); users.add(new AuthenticationUser("system", "manager", "users,admins"));