diff --git a/activemq-core/pom.xml b/activemq-core/pom.xml index 6ac40731da..17596513bc 100755 --- a/activemq-core/pom.xml +++ b/activemq-core/pom.xml @@ -108,6 +108,12 @@ commons-pool true + + com.thoughtworks.xstream + xstream + 1.2.2 + true + @@ -169,6 +175,18 @@ false test-jar + + org.codehaus.jettison + jettison + 1.0-RC1 + test + + + stax + stax-api + 1.0.1 + test + diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index 3dafc36658..7eca4d8514 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -52,6 +52,7 @@ import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.util.ByteArrayOutputStream; +import org.apache.activemq.util.FactoryFinder; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IntrospectionSupport; @@ -84,6 +85,7 @@ public class ProtocolConverter { private int lastCommandId; private final AtomicBoolean connected = new AtomicBoolean(false); private final FrameTranslator frameTranslator; + private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/"); public ProtocolConverter(StompTransportFilter stompTransportFilter, FrameTranslator translator) { this.transportFilter = stompTransportFilter; @@ -131,12 +133,26 @@ public class ProtocolConverter { protected void sendToStomp(StompFrame command) throws IOException { transportFilter.sendToStomp(command); } + + protected FrameTranslator findTranslator(String header) { + FrameTranslator translator = frameTranslator; + try { + if (header != null) { + translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER + .newInstance(header); + } + } catch (Exception ignore) { + // if anything goes wrong use the default translator + } + + return translator; + } /** - * Convert a stomp command - * - * @param command - */ + * Convert a stomp command + * + * @param command + */ public void onStompCommad(StompFrame command) throws IOException, JMSException { try { @@ -340,12 +356,13 @@ public class ProtocolConverter { protected void onStompSubscribe(StompFrame command) throws ProtocolException { checkConnected(); + FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)); Map headers = command.getHeaders(); String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID); String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION); - ActiveMQDestination actualDest = frameTranslator.convertDestination(this, destination); + ActiveMQDestination actualDest = translator.convertDestination(this, destination); ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); ConsumerInfo consumerInfo = new ConsumerInfo(id); consumerInfo.setPrefetchSize(1000); @@ -356,9 +373,9 @@ public class ProtocolConverter { IntrospectionSupport.setProperties(consumerInfo, headers, "activemq."); - consumerInfo.setDestination(frameTranslator.convertDestination(this, destination)); + consumerInfo.setDestination(translator.convertDestination(this, destination)); - StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo); + StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION)); stompSubscription.setDestination(actualDest); String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE); @@ -380,7 +397,7 @@ public class ProtocolConverter { ActiveMQDestination destination = null; Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION); if (o != null) { - destination = frameTranslator.convertDestination(this, (String)o); + destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o); } String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID); @@ -533,12 +550,12 @@ public class ProtocolConverter { } public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException { - ActiveMQMessage msg = frameTranslator.convertFrame(this, command); + ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command); return msg; } public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException { - return frameTranslator.convertMessage(this, message); + return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message); } public StompTransportFilter getTransportFilter() { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java index 0506778fc8..48c7a62e35 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java @@ -48,6 +48,7 @@ public interface Stomp { String RECEIPT_REQUESTED = "receipt"; String TRANSACTION = "transaction"; String CONTENT_LENGTH = "content-length"; + String TRANSFORMATION = "transformation"; public interface Response { String RECEIPT_ID = "receipt-id"; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java index a8e4d8f628..ff5caa10ff 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java @@ -49,11 +49,13 @@ public class StompSubscription { private String ackMode = AUTO_ACK; private ActiveMQDestination destination; + private String transformation; - public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo) { + public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) { this.protocolConverter = stompTransport; this.subscriptionId = subscriptionId; this.consumerInfo = consumerInfo; + this.transformation = transformation; } void onMessageDispatch(MessageDispatch md) throws IOException, JMSException { @@ -68,7 +70,10 @@ public class StompSubscription { MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); protocolConverter.getTransportFilter().sendToActiveMQ(ack); } - + if (transformation != null) { + message.setReadOnlyProperties(false); + message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation); + } StompFrame command = protocolConverter.convertMessage(message); command.setAction(Stomp.Responses.MESSAGE); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java index 24501d08f7..65cde32578 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java @@ -30,7 +30,7 @@ import org.apache.commons.logging.LogFactory; /** * The StompTransportFilter normally sits on top of a TcpTransport that has been * configured with the StompWireFormat and is used to convert STOMP commands to - * ActiveMQ commands. All of the coversion work is done by delegating to the + * ActiveMQ commands. All of the conversion work is done by delegating to the * ProtocolConverter. * * @author chirino diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/XStreamFrameTranslator.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/XStreamFrameTranslator.java new file mode 100644 index 0000000000..20ba3ae5dc --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/XStreamFrameTranslator.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import java.io.IOException; +import java.io.Serializable; +import java.io.StringReader; +import java.io.StringWriter; +import java.util.HashMap; +import java.util.Map; + +import javax.jms.JMSException; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.apache.activemq.command.ActiveMQTextMessage; + +import com.thoughtworks.xstream.XStream; +import com.thoughtworks.xstream.io.HierarchicalStreamReader; +import com.thoughtworks.xstream.io.HierarchicalStreamWriter; +import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver; +import com.thoughtworks.xstream.io.xml.PrettyPrintWriter; +import com.thoughtworks.xstream.io.xml.XppReader; + +/** + * Frame translator implementation that uses XStream to convert messages to and from XML and JSON + * @author Dejan Bosanac + */ +public class XStreamFrameTranslator extends LegacyFrameTranslator { + + XStream xStream = new XStream(); + + public ActiveMQMessage convertFrame(ProtocolConverter converter, + StompFrame command) throws JMSException, ProtocolException { + Map headers = command.getHeaders(); + ActiveMQMessage msg; + if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) { + msg = super.convertFrame(converter, command); + } else { + try { + ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage(); + Object obj = unmarshall(new String(command.getContent(), "UTF-8"), (String)headers.get(Stomp.Headers.TRANSFORMATION)); + objMsg.setObject((Serializable)obj); + msg = objMsg; + } catch (Throwable e) { + msg = super.convertFrame(converter, command); + } + } + FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this); + return msg; + } + + public StompFrame convertMessage(ProtocolConverter converter, + ActiveMQMessage message) throws IOException, JMSException { + if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) { + StompFrame command = new StompFrame(); + command.setAction(Stomp.Responses.MESSAGE); + Map headers = new HashMap(25); + command.setHeaders(headers); + + FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this); + ActiveMQObjectMessage msg = (ActiveMQObjectMessage)message.copy(); + command.setContent(marshall(msg.getObject(), headers.get(Stomp.Headers.TRANSFORMATION)).getBytes("UTF-8")); + return command; + + } else { + return super.convertMessage(converter, message); + } + } + + /** + * Marshalls the Object to a string using XML or JSON + * encoding + */ + protected String marshall(Serializable object, String transformation) throws JMSException { + StringWriter buffer = new StringWriter(); + HierarchicalStreamWriter out; + if (transformation.equalsIgnoreCase("jms-json")) { + out = new JettisonMappedXmlDriver().createWriter(buffer); + } else { + out = new PrettyPrintWriter(buffer); + } + getXStream().marshal(object, out); + return buffer.toString(); + } + + /** + * Unmarshalls the XML or JSON encoded message to an + * Object + */ + protected Object unmarshall(String text, String transformation) { + HierarchicalStreamReader in; + if (transformation.equalsIgnoreCase("jms-json")) { + in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); + } else { + in = new XppReader(new StringReader(text)); + } + return getXStream().unmarshal(in); + } + + // Properties + // ------------------------------------------------------------------------- + public XStream getXStream() { + if (xStream == null) { + xStream = createXStream(); + } + return xStream; + } + + public void setXStream(XStream xStream) { + this.xStream = xStream; + } + + // Implementation methods + // ------------------------------------------------------------------------- + protected XStream createXStream() { + return new XStream(); + } + +} diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-json b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-json new file mode 100644 index 0000000000..57060e7766 --- /dev/null +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-json @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.stomp.XStreamFrameTranslator \ No newline at end of file diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml new file mode 100644 index 0000000000..57060e7766 --- /dev/null +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.stomp.XStreamFrameTranslator \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java new file mode 100644 index 0000000000..72e20799c2 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import java.io.Serializable; + +public class SamplePojo implements Serializable { + private String name; + private String city; + + public SamplePojo() { + } + + public SamplePojo(String name, String city) { + this.name = name; + this.city = city; + } + + + public String getCity() { + return city; + } + + public void setCity(String city) { + this.city = city; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java index 038e743f8e..a2e38b2a15 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java @@ -30,19 +30,16 @@ import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerFactory; -import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; -import org.apache.activemq.security.AuthorizationPlugin; -import org.apache.activemq.security.SimpleSecurityBrokerSystemTest; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -57,6 +54,15 @@ public class StompTest extends CombinationTestSupport { private Connection connection; private Session session; private ActiveMQQueue queue; + private String xmlText = "\n" + + " Dejan\n" + + " Belgrade\n" + + ""; + + private String jsonText = "{\"org.apache.activemq.transport.stomp.SamplePojo\":{" + + "\"name\":\"Dejan\"," + + "\"city\":\"Belgrade\"" + + "}}"; protected void setUp() throws Exception { broker = BrokerFactory.createBroker(new URI(confUri)); @@ -557,6 +563,172 @@ public class StompTest extends CombinationTestSupport { } + public void testTransformationUnknownTranslator() throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:test" + "\n\n" + "Hello World" + Stomp.NULL; + + stompConnection.sendFrame(frame); + + TextMessage message = (TextMessage)consumer.receive(1000); + assertNotNull(message); + assertEquals("Hello World", message.getText()); + } + + public void testTransformationFailed() throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:jms-xml" + "\n\n" + "Hello World" + Stomp.NULL; + + stompConnection.sendFrame(frame); + + TextMessage message = (TextMessage)consumer.receive(1000); + assertNotNull(message); + assertEquals("Hello World", message.getText()); + } + + public void testTransformationSendXML() throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:jms-xml" + "\n\n" + xmlText + Stomp.NULL; + + stompConnection.sendFrame(frame); + + ObjectMessage message = (ObjectMessage)consumer.receive(1000); + assertNotNull(message); + SamplePojo object = (SamplePojo)message.getObject(); + assertEquals("Dejan", object.getName()); + } + + public void testTransformationSendJSON() throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:jms-json" + "\n\n" + jsonText + Stomp.NULL; + + stompConnection.sendFrame(frame); + + ObjectMessage message = (ObjectMessage)consumer.receive(1000); + assertNotNull(message); + SamplePojo object = (SamplePojo)message.getObject(); + assertEquals("Dejan", object.getName()); + } + + public void testTransformationSubscribeXML() throws Exception { + + MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); + ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); + producer.send(message); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:jms-xml" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + + assertTrue(frame.trim().endsWith(xmlText)); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testTransformationReceiveJSON() throws Exception { + MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); + ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); + producer.send(message); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:jms-json" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + + assertTrue(frame.trim().endsWith(jsonText)); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testTransformationReceiveXML() throws Exception { + + MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); + ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); + message.setStringProperty("transformation", "jms-xml"); + producer.send(message); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + + assertTrue(frame.trim().endsWith(xmlText)); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testTransformationNotOverrideSubscription() throws Exception { + MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); + ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); + message.setStringProperty("transformation", "jms-xml"); + producer.send(message); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:jms-json" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + + assertTrue(frame.trim().endsWith(jsonText)); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + protected void assertClients(int expected) throws Exception { org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients(); int actual = clients.length;