diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index f77a67130a..3ab8516ace 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -1651,7 +1651,7 @@ public class BrokerService implements Service { } protected TransportConnector createTransportConnector(Broker broker, URI brokerURI) throws Exception { - TransportServer transport = TransportFactory.bind(getBrokerName(), brokerURI); + TransportServer transport = TransportFactory.bind(this, brokerURI); return new TransportConnector(broker, transport); } @@ -1769,6 +1769,7 @@ public class BrokerService implements Service { for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) { TransportConnector connector = iter.next(); + connector.setBrokerService(this); al.add(startTransportConnector(connector)); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/SslBrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/SslBrokerService.java index 6253005cba..194641c499 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/SslBrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/SslBrokerService.java @@ -95,7 +95,7 @@ public class SslBrokerService extends BrokerService { return transportFactory.doBind(getBrokerName(), brokerURI); } else { // Else, business as usual. - return TransportFactory.bind(getBrokerName(), brokerURI); + return TransportFactory.bind(this, brokerURI); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java index b70dcb7a51..67138c5078 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java @@ -45,7 +45,7 @@ import org.apache.commons.logging.LogFactory; * @org.apache.xbean.XBean * @version $Revision: 1.6 $ */ -public class TransportConnector implements Connector { +public class TransportConnector implements Connector, BrokerServiceAware { private static final Log LOG = LogFactory.getLog(TransportConnector.class); @@ -53,6 +53,7 @@ public class TransportConnector implements Connector { protected TransportStatusDetector statusDector; private Broker broker; + private BrokerService brokerService; private TransportServer server; private URI uri; private BrokerInfo brokerInfo = new BrokerInfo(); @@ -282,7 +283,11 @@ public class TransportConnector implements Connector { if (broker == null) { throw new IllegalArgumentException("You must specify the broker property. Maybe this connector should be added to a broker?"); } - return TransportFactory.bind(broker.getBrokerId().getValue(), uri); + if (brokerService != null) { + return TransportFactory.bind(brokerService, uri); + } else { + return TransportFactory.bind(broker.getBrokerId().getValue(), uri); + } } public DiscoveryAgent getDiscoveryAgent() throws IOException { @@ -375,4 +380,8 @@ public class TransportConnector implements Connector { public void setEnableStatusMonitor(boolean enableStatusMonitor) { this.enableStatusMonitor = enableStatusMonitor; } + + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java b/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java index 228fa2e667..c30bf7f9ae 100644 --- a/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java @@ -126,7 +126,7 @@ public class ProxyConnector implements Service { if (bind == null) { throw new IllegalArgumentException("You must specify either a server or the bind property"); } - return TransportFactory.bind(null, bind); + return TransportFactory.bind((String)null, bind); } private Transport createRemoteTransport() throws Exception { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java index ee692df9a2..516b4eba1a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java @@ -26,6 +26,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.util.FactoryFinder; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IntrospectionSupport; @@ -105,6 +107,14 @@ public abstract class TransportFactory { TransportFactory tf = findTransportFactory(location); return tf.doBind(brokerId, location); } + + public static TransportServer bind(BrokerService brokerService, URI location) throws IOException { + TransportFactory tf = findTransportFactory(location); + if (tf instanceof BrokerServiceAware) { + ((BrokerServiceAware)tf).setBrokerService(brokerService); + } + return tf.doBind(brokerService.getBrokerName(), location); + } public Transport doConnect(URI location) throws Exception { try { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java new file mode 100644 index 0000000000..9922cbbb3c --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import java.io.IOException; +import java.io.Serializable; +import java.io.StringReader; +import java.io.StringWriter; +import java.util.HashMap; +import java.util.Map; + +import javax.jms.JMSException; + +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; + +import com.thoughtworks.xstream.XStream; +import com.thoughtworks.xstream.io.HierarchicalStreamReader; +import com.thoughtworks.xstream.io.HierarchicalStreamWriter; +import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver; +import com.thoughtworks.xstream.io.xml.PrettyPrintWriter; +import com.thoughtworks.xstream.io.xml.XppReader; + +/** + * Frame translator implementation that uses XStream to convert messages to and + * from XML and JSON + * + * @author Dejan Bosanac + */ +public class JmsFrameTranslator extends LegacyFrameTranslator implements + ApplicationContextAware { + + XStream xStream = null; + ApplicationContext applicationContext; + + public ActiveMQMessage convertFrame(ProtocolConverter converter, + StompFrame command) throws JMSException, ProtocolException { + Map headers = command.getHeaders(); + ActiveMQMessage msg; + String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION); + if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) { + msg = super.convertFrame(converter, command); + } else { + HierarchicalStreamReader in; + + try { + String text = new String(command.getContent(), "UTF-8"); + switch (Stomp.Transformations.getValue(transformation)) { + case JMS_OBJECT_XML: + in = new XppReader(new StringReader(text)); + msg = createObjectMessage(in); + break; + case JMS_OBJECT_JSON: + in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); + msg = createObjectMessage(in); + break; + case JMS_MAP_XML: + in = new XppReader(new StringReader(text)); + msg = createMapMessage(in); + break; + case JMS_MAP_JSON: + in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); + msg = createMapMessage(in); + break; + default: + throw new Exception("Unkown transformation: " + transformation); + } + } catch (Throwable e) { + command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage()); + msg = super.convertFrame(converter, command); + } + } + FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this); + return msg; + } + + public StompFrame convertMessage(ProtocolConverter converter, + ActiveMQMessage message) throws IOException, JMSException { + if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) { + StompFrame command = new StompFrame(); + command.setAction(Stomp.Responses.MESSAGE); + Map headers = new HashMap(25); + command.setHeaders(headers); + + FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( + converter, message, command, this); + ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy(); + command.setContent(marshall(msg.getObject(), + headers.get(Stomp.Headers.TRANSFORMATION)) + .getBytes("UTF-8")); + return command; + + } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { + StompFrame command = new StompFrame(); + command.setAction(Stomp.Responses.MESSAGE); + Map headers = new HashMap(25); + command.setHeaders(headers); + + FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( + converter, message, command, this); + ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy(); + command.setContent(marshall((Serializable)msg.getContentMap(), + headers.get(Stomp.Headers.TRANSFORMATION)) + .getBytes("UTF-8")); + return command; + } else { + return super.convertMessage(converter, message); + } + } + + /** + * Marshalls the Object to a string using XML or JSON encoding + */ + protected String marshall(Serializable object, String transformation) + throws JMSException { + StringWriter buffer = new StringWriter(); + HierarchicalStreamWriter out; + if (transformation.toLowerCase().endsWith("json")) { + out = new JettisonMappedXmlDriver().createWriter(buffer); + } else { + out = new PrettyPrintWriter(buffer); + } + getXStream().marshal(object, out); + return buffer.toString(); + } + + protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException { + ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage(); + Object obj = getXStream().unmarshal(in); + objMsg.setObject((Serializable) obj); + return objMsg; + } + + protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException { + ActiveMQMapMessage mapMsg = new ActiveMQMapMessage(); + Map map = (Map)getXStream().unmarshal(in); + for (String key : map.keySet()) { + mapMsg.setObject(key, map.get(key)); + } + return mapMsg; + } + + + + // Properties + // ------------------------------------------------------------------------- + public XStream getXStream() { + if (xStream == null) { + xStream = createXStream(); + } + return xStream; + } + + public void setXStream(XStream xStream) { + this.xStream = xStream; + } + + // Implementation methods + // ------------------------------------------------------------------------- + protected XStream createXStream() { + XStream xstream = null; + if (applicationContext != null) { + String[] names = applicationContext + .getBeanNamesForType(XStream.class); + for (int i = 0; i < names.length; i++) { + String name = names[i]; + xstream = (XStream) applicationContext.getBean(name); + if (xstream != null) { + break; + } + } + } + + if (xstream == null) { + System.out.println("is null"); + xstream = new XStream(); + } + return xstream; + + } + + public void setApplicationContext(ApplicationContext applicationContext) + throws BeansException { + this.applicationContext = applicationContext; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index 7eca4d8514..515a0f0a13 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -57,6 +57,8 @@ import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.LongSequenceGenerator; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; /** * @author chirino @@ -86,10 +88,12 @@ public class ProtocolConverter { private final AtomicBoolean connected = new AtomicBoolean(false); private final FrameTranslator frameTranslator; private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/"); + private final ApplicationContext applicationContext; - public ProtocolConverter(StompTransportFilter stompTransportFilter, FrameTranslator translator) { + public ProtocolConverter(StompTransportFilter stompTransportFilter, FrameTranslator translator, ApplicationContext applicationContext) { this.transportFilter = stompTransportFilter; this.frameTranslator = translator; + this.applicationContext = applicationContext; } protected int generateCommandId() { @@ -140,6 +144,9 @@ public class ProtocolConverter { if (header != null) { translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER .newInstance(header); + if (translator instanceof ApplicationContextAware) { + ((ApplicationContextAware)translator).setApplicationContext(applicationContext); + } } } catch (Exception ignore) { // if anything goes wrong use the default translator @@ -554,8 +561,12 @@ public class ProtocolConverter { return msg; } - public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException { - return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message); + public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException { + if (ignoreTransformation == true) { + return frameTranslator.convertMessage(this, message); + } else { + return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message); + } } public StompTransportFilter getTransportFilter() { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java index 48c7a62e35..265e2d234c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java @@ -49,6 +49,7 @@ public interface Stomp { String TRANSACTION = "transaction"; String CONTENT_LENGTH = "content-length"; String TRANSFORMATION = "transformation"; + String TRANSFORMATION_ERROR = "transformation-error"; public interface Response { String RECEIPT_ID = "receipt-id"; @@ -114,4 +115,16 @@ public interface Stomp { String MESSAGE_ID = "message-id"; } } + + public enum Transformations { + JMS_BYTE, JMS_OBJECT_XML, JMS_OBJECT_JSON, JMS_MAP_XML, JMS_MAP_JSON; + + public String toString() { + return name().replaceAll("_", "-").toLowerCase(); + } + + public static Transformations getValue(String value) { + return valueOf(value.replaceAll("-", "_").toUpperCase()); + } + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java index 025af7aff1..eca3375181 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java @@ -18,25 +18,38 @@ package org.apache.activemq.transport.stomp; import java.util.Map; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.tcp.SslTransportFactory; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.wireformat.WireFormat; +import org.apache.activemq.xbean.XBeanBrokerService; +import org.springframework.context.ApplicationContext; /** * A STOMP over SSL transport factory * * @version $Revision$ */ -public class StompSslTransportFactory extends SslTransportFactory { +public class StompSslTransportFactory extends SslTransportFactory implements BrokerServiceAware { + + private ApplicationContext applicationContext = null; protected String getDefaultWireFormatType() { return "stomp"; } public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { - transport = new StompTransportFilter(transport, new LegacyFrameTranslator()); + transport = new StompTransportFilter(transport, new LegacyFrameTranslator(), applicationContext); IntrospectionSupport.setProperties(transport, options); return super.compositeConfigure(transport, format, options); } + + public void setBrokerService(BrokerService brokerService) { + if (brokerService instanceof XBeanBrokerService) { + this.applicationContext = ((XBeanBrokerService)brokerService).getApplicationContext(); + } + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java index ff5caa10ff..2748576fbc 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java @@ -70,11 +70,18 @@ public class StompSubscription { MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); protocolConverter.getTransportFilter().sendToActiveMQ(ack); } + + boolean ignoreTransformation = false; + if (transformation != null) { message.setReadOnlyProperties(false); message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation); + } else { + if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) { + ignoreTransformation = true; + } } - StompFrame command = protocolConverter.convertMessage(message); + StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation); command.setAction(Stomp.Responses.MESSAGE); if (subscriptionId != null) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java index 009f6a0156..c175e2c335 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java @@ -18,24 +18,30 @@ package org.apache.activemq.transport.stomp; import java.util.Map; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.wireformat.WireFormat; +import org.apache.activemq.xbean.XBeanBrokerService; +import org.springframework.context.ApplicationContext; /** * A STOMP transport factory * * @version $Revision: 1.1.1.1 $ */ -public class StompTransportFactory extends TcpTransportFactory { +public class StompTransportFactory extends TcpTransportFactory implements BrokerServiceAware { + private ApplicationContext applicationContext = null; + protected String getDefaultWireFormatType() { return "stomp"; } public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { - transport = new StompTransportFilter(transport, new LegacyFrameTranslator()); + transport = new StompTransportFilter(transport, new LegacyFrameTranslator(), applicationContext); IntrospectionSupport.setProperties(transport, options); return super.compositeConfigure(transport, format, options); } @@ -45,4 +51,10 @@ public class StompTransportFactory extends TcpTransportFactory { // packets return false; } + + public void setBrokerService(BrokerService brokerService) { + if (brokerService instanceof XBeanBrokerService) { + this.applicationContext = ((XBeanBrokerService)brokerService).getApplicationContext(); + } + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java index 65cde32578..0f0f24f292 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java @@ -26,6 +26,7 @@ import org.apache.activemq.transport.TransportFilter; import org.apache.activemq.util.IOExceptionSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.context.ApplicationContext; /** * The StompTransportFilter normally sits on top of a TcpTransport that has been @@ -46,10 +47,10 @@ public class StompTransportFilter extends TransportFilter { private boolean trace; - public StompTransportFilter(Transport next, FrameTranslator translator) { + public StompTransportFilter(Transport next, FrameTranslator translator, ApplicationContext applicationContext) { super(next); this.frameTranslator = translator; - this.protocolConverter = new ProtocolConverter(this, translator); + this.protocolConverter = new ProtocolConverter(this, translator, applicationContext); } public void oneway(Object o) throws IOException { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/XStreamFrameTranslator.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/XStreamFrameTranslator.java index 20ba3ae5dc..e69de29bb2 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/XStreamFrameTranslator.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/XStreamFrameTranslator.java @@ -1,138 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import java.io.IOException; -import java.io.Serializable; -import java.io.StringReader; -import java.io.StringWriter; -import java.util.HashMap; -import java.util.Map; - -import javax.jms.JMSException; -import javax.jms.ObjectMessage; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQObjectMessage; -import org.apache.activemq.command.ActiveMQTextMessage; - -import com.thoughtworks.xstream.XStream; -import com.thoughtworks.xstream.io.HierarchicalStreamReader; -import com.thoughtworks.xstream.io.HierarchicalStreamWriter; -import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver; -import com.thoughtworks.xstream.io.xml.PrettyPrintWriter; -import com.thoughtworks.xstream.io.xml.XppReader; - -/** - * Frame translator implementation that uses XStream to convert messages to and from XML and JSON - * @author Dejan Bosanac - */ -public class XStreamFrameTranslator extends LegacyFrameTranslator { - - XStream xStream = new XStream(); - - public ActiveMQMessage convertFrame(ProtocolConverter converter, - StompFrame command) throws JMSException, ProtocolException { - Map headers = command.getHeaders(); - ActiveMQMessage msg; - if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) { - msg = super.convertFrame(converter, command); - } else { - try { - ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage(); - Object obj = unmarshall(new String(command.getContent(), "UTF-8"), (String)headers.get(Stomp.Headers.TRANSFORMATION)); - objMsg.setObject((Serializable)obj); - msg = objMsg; - } catch (Throwable e) { - msg = super.convertFrame(converter, command); - } - } - FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this); - return msg; - } - - public StompFrame convertMessage(ProtocolConverter converter, - ActiveMQMessage message) throws IOException, JMSException { - if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) { - StompFrame command = new StompFrame(); - command.setAction(Stomp.Responses.MESSAGE); - Map headers = new HashMap(25); - command.setHeaders(headers); - - FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this); - ActiveMQObjectMessage msg = (ActiveMQObjectMessage)message.copy(); - command.setContent(marshall(msg.getObject(), headers.get(Stomp.Headers.TRANSFORMATION)).getBytes("UTF-8")); - return command; - - } else { - return super.convertMessage(converter, message); - } - } - - /** - * Marshalls the Object to a string using XML or JSON - * encoding - */ - protected String marshall(Serializable object, String transformation) throws JMSException { - StringWriter buffer = new StringWriter(); - HierarchicalStreamWriter out; - if (transformation.equalsIgnoreCase("jms-json")) { - out = new JettisonMappedXmlDriver().createWriter(buffer); - } else { - out = new PrettyPrintWriter(buffer); - } - getXStream().marshal(object, out); - return buffer.toString(); - } - - /** - * Unmarshalls the XML or JSON encoded message to an - * Object - */ - protected Object unmarshall(String text, String transformation) { - HierarchicalStreamReader in; - if (transformation.equalsIgnoreCase("jms-json")) { - in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); - } else { - in = new XppReader(new StringReader(text)); - } - return getXStream().unmarshal(in); - } - - // Properties - // ------------------------------------------------------------------------- - public XStream getXStream() { - if (xStream == null) { - xStream = createXStream(); - } - return xStream; - } - - public void setXStream(XStream xStream) { - this.xStream = xStream; - } - - // Implementation methods - // ------------------------------------------------------------------------- - protected XStream createXStream() { - return new XStream(); - } - -} diff --git a/activemq-core/src/main/java/org/apache/activemq/util/XStreamFactoryBean.java b/activemq-core/src/main/java/org/apache/activemq/util/XStreamFactoryBean.java new file mode 100644 index 0000000000..e1539781bc --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/util/XStreamFactoryBean.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +import java.util.Iterator; +import java.util.Map; + +import org.springframework.beans.factory.FactoryBean; +import org.springframework.beans.propertyeditors.ClassEditor; +import org.springframework.util.Assert; + +import com.thoughtworks.xstream.XStream; +import com.thoughtworks.xstream.annotations.Annotations; +import com.thoughtworks.xstream.converters.Converter; +import com.thoughtworks.xstream.converters.ConverterMatcher; +import com.thoughtworks.xstream.converters.SingleValueConverter; + +public class XStreamFactoryBean implements FactoryBean { + + XStream xstream = new XStream(); + + /** + * Sets the Converters or SingleValueConverters to be registered with the + * XStream instance. + * + * @see Converter + * @see SingleValueConverter + */ + public void setConverters(ConverterMatcher[] converters) { + for (int i = 0; i < converters.length; i++) { + if (converters[i] instanceof Converter) { + xstream.registerConverter((Converter) converters[i], i); + } + else if (converters[i] instanceof SingleValueConverter) { + xstream.registerConverter((SingleValueConverter) converters[i], i); + } + else { + throw new IllegalArgumentException("Invalid ConverterMatcher [" + converters[i] + "]"); + } + } + } + + /** + * Set a alias/type map, consisting of string aliases mapped to Class instances (or Strings to be + * converted to Class instances). + * + * @see org.springframework.beans.propertyeditors.ClassEditor + */ + public void setAliases(Map aliases) { + for (Iterator iterator = aliases.entrySet().iterator(); iterator.hasNext();) { + Map.Entry entry = (Map.Entry) iterator.next(); + // Check whether we need to convert from String to Class. + Class type; + if (entry.getValue() instanceof Class) { + type = (Class) entry.getValue(); + } + else { + ClassEditor editor = new ClassEditor(); + editor.setAsText(String.valueOf(entry.getValue())); + type = (Class) editor.getValue(); + } + xstream.alias((String) entry.getKey(), type); + } + } + + /** + * Sets the XStream mode. + * + * @see XStream#XPATH_REFERENCES + * @see XStream#ID_REFERENCES + * @see XStream#NO_REFERENCES + */ + public void setMode(int mode) { + xstream.setMode(mode); + } + + /** + * Sets the classes, for which mappings will be read from class-level JDK 1.5+ annotation metadata. + * + * @see Annotations#configureAliases(XStream, Class[]) + */ + public void setAnnotatedClass(Class annotatedClass) { + Assert.notNull(annotatedClass, "'annotatedClass' must not be null"); + Annotations.configureAliases(xstream, annotatedClass); + } + + /** + * Sets annotated classes, for which aliases will be read from class-level JDK 1.5+ annotation metadata. + * + * @see Annotations#configureAliases(XStream, Class[]) + */ + public void setAnnotatedClasses(Class[] annotatedClasses) { + Assert.notEmpty(annotatedClasses, "'annotatedClasses' must not be empty"); + Annotations.configureAliases(xstream, annotatedClasses); + } + + public Object getObject() throws Exception { + return xstream; + } + + public Class getObjectType() { + return XStream.class; + } + + public boolean isSingleton() { + return true; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerFactory.java b/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerFactory.java index b299705127..5ca5c86846 100644 --- a/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerFactory.java @@ -29,6 +29,7 @@ import org.apache.xbean.spring.context.ResourceXmlApplicationContext; import org.apache.xbean.spring.context.impl.URIEditor; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.Resource; @@ -70,7 +71,11 @@ public class XBeanBrokerFactory implements BrokerFactoryHandler { if (broker == null) { throw new IllegalArgumentException("The configuration has no BrokerService instance for resource: " + config); } - + + if (broker instanceof ApplicationContextAware) { + ((ApplicationContextAware)broker).setApplicationContext(context); + } + // TODO warning resources from the context may not be closed down! return broker; diff --git a/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java b/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java index 2b1a7e0ec7..f07c72b44b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java @@ -17,8 +17,11 @@ package org.apache.activemq.xbean; import org.apache.activemq.broker.BrokerService; +import org.springframework.beans.BeansException; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; /** * An ActiveMQ Message Broker. It consists of a number of transport @@ -34,9 +37,10 @@ import org.springframework.beans.factory.InitializingBean; * {code} * @version $Revision: 1.1 $ */ -public class XBeanBrokerService extends BrokerService implements InitializingBean, DisposableBean { +public class XBeanBrokerService extends BrokerService implements InitializingBean, DisposableBean, ApplicationContextAware { private boolean start = true; + private ApplicationContext applicationContext = null; public XBeanBrokerService() { } @@ -63,4 +67,15 @@ public class XBeanBrokerService extends BrokerService implements InitializingBea public void setStart(boolean start) { this.start = start; } + + public void setApplicationContext(ApplicationContext applicationContext) + throws BeansException { + this.applicationContext = applicationContext; + } + + public ApplicationContext getApplicationContext() { + return applicationContext; + } + + } diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-byte b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-byte new file mode 100644 index 0000000000..e7fb9adbed --- /dev/null +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-byte @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.stomp.JmsFrameTranslator \ No newline at end of file diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-json b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-json index 57060e7766..e69de29bb2 100644 --- a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-json +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-json @@ -1,17 +0,0 @@ -## --------------------------------------------------------------------------- -## Licensed to the Apache Software Foundation (ASF) under one or more -## contributor license agreements. See the NOTICE file distributed with -## this work for additional information regarding copyright ownership. -## The ASF licenses this file to You under the Apache License, Version 2.0 -## (the "License"); you may not use this file except in compliance with -## the License. You may obtain a copy of the License at -## -## http://www.apache.org/licenses/LICENSE-2.0 -## -## Unless required by applicable law or agreed to in writing, software -## distributed under the License is distributed on an "AS IS" BASIS, -## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -## See the License for the specific language governing permissions and -## limitations under the License. -## --------------------------------------------------------------------------- -class=org.apache.activemq.transport.stomp.XStreamFrameTranslator \ No newline at end of file diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-map-json b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-map-json new file mode 100644 index 0000000000..e7fb9adbed --- /dev/null +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-map-json @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.stomp.JmsFrameTranslator \ No newline at end of file diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-map-xml b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-map-xml new file mode 100644 index 0000000000..e7fb9adbed --- /dev/null +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-map-xml @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.stomp.JmsFrameTranslator \ No newline at end of file diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-object-json b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-object-json new file mode 100644 index 0000000000..e7fb9adbed --- /dev/null +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-object-json @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.stomp.JmsFrameTranslator \ No newline at end of file diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-object-xml b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-object-xml new file mode 100644 index 0000000000..e7fb9adbed --- /dev/null +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-object-xml @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.stomp.JmsFrameTranslator \ No newline at end of file diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml index 57060e7766..e69de29bb2 100644 --- a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml @@ -1,17 +0,0 @@ -## --------------------------------------------------------------------------- -## Licensed to the Apache Software Foundation (ASF) under one or more -## contributor license agreements. See the NOTICE file distributed with -## this work for additional information regarding copyright ownership. -## The ASF licenses this file to You under the Apache License, Version 2.0 -## (the "License"); you may not use this file except in compliance with -## the License. You may obtain a copy of the License at -## -## http://www.apache.org/licenses/LICENSE-2.0 -## -## Unless required by applicable law or agreed to in writing, software -## distributed under the License is distributed on an "AS IS" BASIS, -## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -## See the License for the specific language governing permissions and -## limitations under the License. -## --------------------------------------------------------------------------- -class=org.apache.activemq.transport.stomp.XStreamFrameTranslator \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java index 72e20799c2..7671080d1c 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java @@ -18,8 +18,13 @@ package org.apache.activemq.transport.stomp; import java.io.Serializable; +import com.thoughtworks.xstream.annotations.XStreamAlias; + +@XStreamAlias("pojo") public class SamplePojo implements Serializable { + @XStreamAlias("name") private String name; + @XStreamAlias("city") private String city; public SamplePojo() { diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java index a2e38b2a15..b5364724d7 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java @@ -28,6 +28,8 @@ import java.util.regex.Pattern; import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; @@ -54,16 +56,34 @@ public class StompTest extends CombinationTestSupport { private Connection connection; private Session session; private ActiveMQQueue queue; - private String xmlText = "\n" - + " Dejan\n" - + " Belgrade\n" - + ""; - - private String jsonText = "{\"org.apache.activemq.transport.stomp.SamplePojo\":{" - + "\"name\":\"Dejan\"," + private String xmlObject = "\n" + + " Dejan\n" + + " Belgrade\n" + + ""; + + private String xmlMap = "\n" + + " \n" + + " name\n" + + " Dejan\n" + + " \n" + + " \n" + + " city\n" + + " Belgrade\n" + + " \n" + + "\n"; + + private String jsonObject = "{\"pojo\":{" + + "\"name\":\"Dejan\"," + "\"city\":\"Belgrade\"" - + "}}"; - + + "}}"; + + private String jsonMap = "{\"map\":{" + + "\"entry\":[" + + "{\"string\":[\"name\",\"Dejan\"]}," + + "{\"string\":[\"city\",\"Belgrade\"]}" + + "]" + + "}}"; + protected void setUp() throws Exception { broker = BrokerFactory.createBroker(new URI(confUri)); broker.start(); @@ -301,7 +321,6 @@ public class StompTest extends CombinationTestSupport { frame = stompConnection.receiveFrame(); assertTrue(frame.startsWith("MESSAGE")); - // System.out.println("out: "+frame); frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); @@ -590,16 +609,17 @@ public class StompTest extends CombinationTestSupport { frame = stompConnection.receiveFrame(); assertTrue(frame.startsWith("CONNECTED")); - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:jms-xml" + "\n\n" + "Hello World" + Stomp.NULL; + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + "Hello World" + Stomp.NULL; stompConnection.sendFrame(frame); TextMessage message = (TextMessage)consumer.receive(1000); assertNotNull(message); + assertNotNull(message.getStringProperty(Stomp.Headers.TRANSFORMATION_ERROR)); assertEquals("Hello World", message.getText()); } - public void testTransformationSendXML() throws Exception { + public void testTransformationSendXMLObject() throws Exception { MessageConsumer consumer = session.createConsumer(queue); String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; @@ -608,7 +628,7 @@ public class StompTest extends CombinationTestSupport { frame = stompConnection.receiveFrame(); assertTrue(frame.startsWith("CONNECTED")); - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:jms-xml" + "\n\n" + xmlText + Stomp.NULL; + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + xmlObject + Stomp.NULL; stompConnection.sendFrame(frame); @@ -618,7 +638,7 @@ public class StompTest extends CombinationTestSupport { assertEquals("Dejan", object.getName()); } - public void testTransformationSendJSON() throws Exception { + public void testTransformationSendJSONObject() throws Exception { MessageConsumer consumer = session.createConsumer(queue); String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; @@ -627,7 +647,7 @@ public class StompTest extends CombinationTestSupport { frame = stompConnection.receiveFrame(); assertTrue(frame.startsWith("CONNECTED")); - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:jms-json" + "\n\n" + jsonText + Stomp.NULL; + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + jsonObject + Stomp.NULL; stompConnection.sendFrame(frame); @@ -649,18 +669,18 @@ public class StompTest extends CombinationTestSupport { frame = stompConnection.receiveFrame(); assertTrue(frame.startsWith("CONNECTED")); - frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:jms-xml" + "\n\n" + Stomp.NULL; + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); frame = stompConnection.receiveFrame(); - assertTrue(frame.trim().endsWith(xmlText)); - + assertTrue(frame.trim().endsWith(xmlObject)); + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); } - public void testTransformationReceiveJSON() throws Exception { + public void testTransformationReceiveJSONObject() throws Exception { MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); producer.send(message); @@ -671,22 +691,21 @@ public class StompTest extends CombinationTestSupport { frame = stompConnection.receiveFrame(); assertTrue(frame.startsWith("CONNECTED")); - frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:jms-json" + "\n\n" + Stomp.NULL; + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); frame = stompConnection.receiveFrame(); - assertTrue(frame.trim().endsWith(jsonText)); + assertTrue(frame.trim().endsWith(jsonObject)); frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); } - public void testTransformationReceiveXML() throws Exception { + public void testTransformationReceiveXMLObject() throws Exception { MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); - message.setStringProperty("transformation", "jms-xml"); producer.send(message); String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; @@ -695,12 +714,12 @@ public class StompTest extends CombinationTestSupport { frame = stompConnection.receiveFrame(); assertTrue(frame.startsWith("CONNECTED")); - frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n\n" + Stomp.NULL; + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); frame = stompConnection.receiveFrame(); - assertTrue(frame.trim().endsWith(xmlText)); + assertTrue(frame.trim().endsWith(xmlObject)); frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); @@ -709,7 +728,116 @@ public class StompTest extends CombinationTestSupport { public void testTransformationNotOverrideSubscription() throws Exception { MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); - message.setStringProperty("transformation", "jms-xml"); + message.setStringProperty("transformation", Stomp.Transformations.JMS_OBJECT_XML.toString()); + producer.send(message); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + + assertTrue(frame.trim().endsWith(jsonObject)); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testTransformationIgnoreTransformation() throws Exception { + MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); + ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); + message.setStringProperty("transformation", Stomp.Transformations.JMS_OBJECT_XML.toString()); + producer.send(message); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + + assertTrue(frame.endsWith("\n\n")); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testTransformationSendXMLMap() throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_MAP_XML + "\n\n" + xmlMap + Stomp.NULL; + + stompConnection.sendFrame(frame); + + MapMessage message = (MapMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals(message.getString("name"), "Dejan"); + } + + public void testTransformationSendJSONMap() throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_MAP_JSON + "\n\n" + jsonMap + Stomp.NULL; + + stompConnection.sendFrame(frame); + + MapMessage message = (MapMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals(message.getString("name"), "Dejan"); + } + + public void testTransformationReceiveXMLMap() throws Exception { + + MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); + MapMessage message = session.createMapMessage(); + message.setString("name", "Dejan"); + message.setString("city", "Belgrade"); + producer.send(message); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n" + "transformation:" + Stomp.Transformations.JMS_MAP_XML + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + + assertTrue(frame.trim().endsWith(xmlMap.trim())); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testTransformationReceiveJSONMap() throws Exception { + + MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); + MapMessage message = session.createMapMessage(); + message.setString("name", "Dejan"); + message.setString("city", "Belgrade"); producer.send(message); String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; @@ -718,12 +846,12 @@ public class StompTest extends CombinationTestSupport { frame = stompConnection.receiveFrame(); assertTrue(frame.startsWith("CONNECTED")); - frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:jms-json" + "\n\n" + Stomp.NULL; + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n" + "transformation:" + Stomp.Transformations.JMS_MAP_JSON + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); frame = stompConnection.receiveFrame(); - assertTrue(frame.trim().endsWith(jsonText)); + assertTrue(frame.trim().endsWith(jsonMap.trim())); frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); diff --git a/activemq-core/src/test/java/org/apache/activemq/xbean/ApplicationContextAwareTest.java b/activemq-core/src/test/java/org/apache/activemq/xbean/ApplicationContextAwareTest.java new file mode 100644 index 0000000000..63e1fc8f86 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/xbean/ApplicationContextAwareTest.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.xbean; + +import java.net.URI; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; + + +public class ApplicationContextAwareTest extends MultipleTestsWithEmbeddedBrokerTest { + + protected BrokerService createBroker() throws Exception { + return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/xbean/activemq.xml")); + } + + public void testContextAware() { + assertTrue(broker instanceof XBeanBrokerService); + assertTrue(broker instanceof ApplicationContextAware); + ApplicationContext context = ((XBeanBrokerService)broker).getApplicationContext(); + assertTrue(context.containsBean("org.apache.activemq.xbean.XBeanBrokerService")); + } +} diff --git a/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/stomp-auth-broker.xml b/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/stomp-auth-broker.xml index 29ee629b05..73b97e06e2 100644 --- a/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/stomp-auth-broker.xml +++ b/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/stomp-auth-broker.xml @@ -20,6 +20,10 @@ + + + org.apache.activemq.transport.stomp.SamplePojo +