diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java index e1b6f93525..ca2906d88e 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java @@ -26,6 +26,7 @@ import java.util.Map; import javax.jms.JMSException; +import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.broker.BrokerContextAware; @@ -102,6 +103,7 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements @Override public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException { + if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) { StompFrame command = new StompFrame(); command.setAction(Stomp.Responses.MESSAGE); @@ -153,6 +155,10 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( converter, message, command, this); + if (!headers.containsKey(Stomp.Headers.TRANSFORMATION)) { + headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString()); + } + if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) { headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString()); } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) { @@ -274,4 +280,16 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements public void setBrokerContext(BrokerContext brokerContext) { this.brokerContext = brokerContext; } + + /** + * Return an Advisory message as a JSON formatted string + * @param ds + * @return + */ + protected String marshallAdvisory(final DataStructure ds) { + XStream xstream = new XStream(new JsonHierarchicalStreamDriver()); + xstream.setMode(XStream.NO_REFERENCES); + xstream.aliasPackage("", "org.apache.activemq.command"); + return xstream.toXML(ds); + } } diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java index 1d826a44e6..8cfa1219e4 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java @@ -16,25 +16,19 @@ */ package org.apache.activemq.transport.stomp; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import javax.jms.Destination; -import javax.jms.JMSException; - -import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.command.ActiveMQBytesMessage; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTextMessage; -import org.apache.activemq.command.DataStructure; import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.activemq.util.ByteSequence; -import com.thoughtworks.xstream.XStream; -import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver; +import javax.jms.Destination; +import javax.jms.JMSException; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; /** * Implements ActiveMQ 4.0 translations @@ -127,15 +121,8 @@ public class LegacyFrameTranslator implements FrameTranslator { headers.put(Stomp.Headers.CONTENT_LENGTH, Integer.toString(data.length)); command.setContent(data); - } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE && - AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) { - - FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( - converter, message, command, this); - - String body = marshallAdvisory(message.getDataStructure()); - command.setContent(body.getBytes("UTF-8")); } + return command; } @@ -212,15 +199,5 @@ public class LegacyFrameTranslator implements FrameTranslator { } } - /** - * Return an Advisory message as a JSON formatted string - * @param ds - * @return - */ - protected String marshallAdvisory(final DataStructure ds) { - XStream xstream = new XStream(new JsonHierarchicalStreamDriver()); - xstream.setMode(XStream.NO_REFERENCES); - xstream.aliasPackage("", "org.apache.activemq.command"); - return xstream.toXML(ds); - } + } diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index a6a22f17f2..f31aad1b61 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.JMSException; import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.broker.BrokerContextAware; import org.apache.activemq.command.ActiveMQDestination; @@ -200,19 +201,28 @@ public class ProtocolConverter { } protected FrameTranslator findTranslator(String header) { + return findTranslator(header, null); + } + + protected FrameTranslator findTranslator(String header, ActiveMQDestination destination) { FrameTranslator translator = frameTranslator; try { if (header != null) { translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER .newInstance(header); - if (translator instanceof BrokerContextAware) { - ((BrokerContextAware)translator).setBrokerContext(brokerContext); + } else { + if (destination != null && AdvisorySupport.isAdvisoryTopic(destination)) { + translator = new JmsFrameTranslator(); } } } catch (Exception ignore) { // if anything goes wrong use the default translator } + if (translator instanceof BrokerContextAware) { + ((BrokerContextAware)translator).setBrokerContext(brokerContext); + } + return translator; } @@ -879,7 +889,7 @@ public class ProtocolConverter { if (ignoreTransformation == true) { return frameTranslator.convertMessage(this, message); } else { - return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message); + return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION), message.getDestination()).convertMessage(this, message); } }