mirror of https://github.com/apache/activemq.git
Applying patch: https://issues.apache.org/activemq/browse/AMQ-1567 Thanks Dejan!
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@645574 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9d7bd247ad
commit
39100564da
|
@ -1651,7 +1651,7 @@ public class BrokerService implements Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected TransportConnector createTransportConnector(Broker broker, URI brokerURI) throws Exception {
|
protected TransportConnector createTransportConnector(Broker broker, URI brokerURI) throws Exception {
|
||||||
TransportServer transport = TransportFactory.bind(getBrokerName(), brokerURI);
|
TransportServer transport = TransportFactory.bind(this, brokerURI);
|
||||||
return new TransportConnector(broker, transport);
|
return new TransportConnector(broker, transport);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1769,6 +1769,7 @@ public class BrokerService implements Service {
|
||||||
|
|
||||||
for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
|
for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
|
||||||
TransportConnector connector = iter.next();
|
TransportConnector connector = iter.next();
|
||||||
|
connector.setBrokerService(this);
|
||||||
al.add(startTransportConnector(connector));
|
al.add(startTransportConnector(connector));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -95,7 +95,7 @@ public class SslBrokerService extends BrokerService {
|
||||||
return transportFactory.doBind(getBrokerName(), brokerURI);
|
return transportFactory.doBind(getBrokerName(), brokerURI);
|
||||||
} else {
|
} else {
|
||||||
// Else, business as usual.
|
// Else, business as usual.
|
||||||
return TransportFactory.bind(getBrokerName(), brokerURI);
|
return TransportFactory.bind(this, brokerURI);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,7 +45,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
* @org.apache.xbean.XBean
|
* @org.apache.xbean.XBean
|
||||||
* @version $Revision: 1.6 $
|
* @version $Revision: 1.6 $
|
||||||
*/
|
*/
|
||||||
public class TransportConnector implements Connector {
|
public class TransportConnector implements Connector, BrokerServiceAware {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TransportConnector.class);
|
private static final Log LOG = LogFactory.getLog(TransportConnector.class);
|
||||||
|
|
||||||
|
@ -53,6 +53,7 @@ public class TransportConnector implements Connector {
|
||||||
protected TransportStatusDetector statusDector;
|
protected TransportStatusDetector statusDector;
|
||||||
|
|
||||||
private Broker broker;
|
private Broker broker;
|
||||||
|
private BrokerService brokerService;
|
||||||
private TransportServer server;
|
private TransportServer server;
|
||||||
private URI uri;
|
private URI uri;
|
||||||
private BrokerInfo brokerInfo = new BrokerInfo();
|
private BrokerInfo brokerInfo = new BrokerInfo();
|
||||||
|
@ -282,8 +283,12 @@ public class TransportConnector implements Connector {
|
||||||
if (broker == null) {
|
if (broker == null) {
|
||||||
throw new IllegalArgumentException("You must specify the broker property. Maybe this connector should be added to a broker?");
|
throw new IllegalArgumentException("You must specify the broker property. Maybe this connector should be added to a broker?");
|
||||||
}
|
}
|
||||||
|
if (brokerService != null) {
|
||||||
|
return TransportFactory.bind(brokerService, uri);
|
||||||
|
} else {
|
||||||
return TransportFactory.bind(broker.getBrokerId().getValue(), uri);
|
return TransportFactory.bind(broker.getBrokerId().getValue(), uri);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public DiscoveryAgent getDiscoveryAgent() throws IOException {
|
public DiscoveryAgent getDiscoveryAgent() throws IOException {
|
||||||
if (discoveryAgent == null) {
|
if (discoveryAgent == null) {
|
||||||
|
@ -375,4 +380,8 @@ public class TransportConnector implements Connector {
|
||||||
public void setEnableStatusMonitor(boolean enableStatusMonitor) {
|
public void setEnableStatusMonitor(boolean enableStatusMonitor) {
|
||||||
this.enableStatusMonitor = enableStatusMonitor;
|
this.enableStatusMonitor = enableStatusMonitor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setBrokerService(BrokerService brokerService) {
|
||||||
|
this.brokerService = brokerService;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -126,7 +126,7 @@ public class ProxyConnector implements Service {
|
||||||
if (bind == null) {
|
if (bind == null) {
|
||||||
throw new IllegalArgumentException("You must specify either a server or the bind property");
|
throw new IllegalArgumentException("You must specify either a server or the bind property");
|
||||||
}
|
}
|
||||||
return TransportFactory.bind(null, bind);
|
return TransportFactory.bind((String)null, bind);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Transport createRemoteTransport() throws Exception {
|
private Transport createRemoteTransport() throws Exception {
|
||||||
|
|
|
@ -26,6 +26,8 @@ import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.BrokerServiceAware;
|
||||||
import org.apache.activemq.util.FactoryFinder;
|
import org.apache.activemq.util.FactoryFinder;
|
||||||
import org.apache.activemq.util.IOExceptionSupport;
|
import org.apache.activemq.util.IOExceptionSupport;
|
||||||
import org.apache.activemq.util.IntrospectionSupport;
|
import org.apache.activemq.util.IntrospectionSupport;
|
||||||
|
@ -106,6 +108,14 @@ public abstract class TransportFactory {
|
||||||
return tf.doBind(brokerId, location);
|
return tf.doBind(brokerId, location);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static TransportServer bind(BrokerService brokerService, URI location) throws IOException {
|
||||||
|
TransportFactory tf = findTransportFactory(location);
|
||||||
|
if (tf instanceof BrokerServiceAware) {
|
||||||
|
((BrokerServiceAware)tf).setBrokerService(brokerService);
|
||||||
|
}
|
||||||
|
return tf.doBind(brokerService.getBrokerName(), location);
|
||||||
|
}
|
||||||
|
|
||||||
public Transport doConnect(URI location) throws Exception {
|
public Transport doConnect(URI location) throws Exception {
|
||||||
try {
|
try {
|
||||||
Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
|
Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
|
||||||
|
|
|
@ -0,0 +1,205 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.transport.stomp;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.io.StringReader;
|
||||||
|
import java.io.StringWriter;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
|
||||||
|
import org.apache.activemq.command.ActiveMQMapMessage;
|
||||||
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
|
import org.apache.activemq.command.ActiveMQObjectMessage;
|
||||||
|
import org.springframework.beans.BeansException;
|
||||||
|
import org.springframework.context.ApplicationContext;
|
||||||
|
import org.springframework.context.ApplicationContextAware;
|
||||||
|
|
||||||
|
import com.thoughtworks.xstream.XStream;
|
||||||
|
import com.thoughtworks.xstream.io.HierarchicalStreamReader;
|
||||||
|
import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
|
||||||
|
import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
|
||||||
|
import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
|
||||||
|
import com.thoughtworks.xstream.io.xml.XppReader;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Frame translator implementation that uses XStream to convert messages to and
|
||||||
|
* from XML and JSON
|
||||||
|
*
|
||||||
|
* @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
|
||||||
|
*/
|
||||||
|
public class JmsFrameTranslator extends LegacyFrameTranslator implements
|
||||||
|
ApplicationContextAware {
|
||||||
|
|
||||||
|
XStream xStream = null;
|
||||||
|
ApplicationContext applicationContext;
|
||||||
|
|
||||||
|
public ActiveMQMessage convertFrame(ProtocolConverter converter,
|
||||||
|
StompFrame command) throws JMSException, ProtocolException {
|
||||||
|
Map headers = command.getHeaders();
|
||||||
|
ActiveMQMessage msg;
|
||||||
|
String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
|
||||||
|
if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
|
||||||
|
msg = super.convertFrame(converter, command);
|
||||||
|
} else {
|
||||||
|
HierarchicalStreamReader in;
|
||||||
|
|
||||||
|
try {
|
||||||
|
String text = new String(command.getContent(), "UTF-8");
|
||||||
|
switch (Stomp.Transformations.getValue(transformation)) {
|
||||||
|
case JMS_OBJECT_XML:
|
||||||
|
in = new XppReader(new StringReader(text));
|
||||||
|
msg = createObjectMessage(in);
|
||||||
|
break;
|
||||||
|
case JMS_OBJECT_JSON:
|
||||||
|
in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
|
||||||
|
msg = createObjectMessage(in);
|
||||||
|
break;
|
||||||
|
case JMS_MAP_XML:
|
||||||
|
in = new XppReader(new StringReader(text));
|
||||||
|
msg = createMapMessage(in);
|
||||||
|
break;
|
||||||
|
case JMS_MAP_JSON:
|
||||||
|
in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
|
||||||
|
msg = createMapMessage(in);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new Exception("Unkown transformation: " + transformation);
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
|
||||||
|
msg = super.convertFrame(converter, command);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
|
||||||
|
return msg;
|
||||||
|
}
|
||||||
|
|
||||||
|
public StompFrame convertMessage(ProtocolConverter converter,
|
||||||
|
ActiveMQMessage message) throws IOException, JMSException {
|
||||||
|
if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
|
||||||
|
StompFrame command = new StompFrame();
|
||||||
|
command.setAction(Stomp.Responses.MESSAGE);
|
||||||
|
Map<String, String> headers = new HashMap<String, String>(25);
|
||||||
|
command.setHeaders(headers);
|
||||||
|
|
||||||
|
FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
|
||||||
|
converter, message, command, this);
|
||||||
|
ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
|
||||||
|
command.setContent(marshall(msg.getObject(),
|
||||||
|
headers.get(Stomp.Headers.TRANSFORMATION))
|
||||||
|
.getBytes("UTF-8"));
|
||||||
|
return command;
|
||||||
|
|
||||||
|
} else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
|
||||||
|
StompFrame command = new StompFrame();
|
||||||
|
command.setAction(Stomp.Responses.MESSAGE);
|
||||||
|
Map<String, String> headers = new HashMap<String, String>(25);
|
||||||
|
command.setHeaders(headers);
|
||||||
|
|
||||||
|
FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
|
||||||
|
converter, message, command, this);
|
||||||
|
ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
|
||||||
|
command.setContent(marshall((Serializable)msg.getContentMap(),
|
||||||
|
headers.get(Stomp.Headers.TRANSFORMATION))
|
||||||
|
.getBytes("UTF-8"));
|
||||||
|
return command;
|
||||||
|
} else {
|
||||||
|
return super.convertMessage(converter, message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Marshalls the Object to a string using XML or JSON encoding
|
||||||
|
*/
|
||||||
|
protected String marshall(Serializable object, String transformation)
|
||||||
|
throws JMSException {
|
||||||
|
StringWriter buffer = new StringWriter();
|
||||||
|
HierarchicalStreamWriter out;
|
||||||
|
if (transformation.toLowerCase().endsWith("json")) {
|
||||||
|
out = new JettisonMappedXmlDriver().createWriter(buffer);
|
||||||
|
} else {
|
||||||
|
out = new PrettyPrintWriter(buffer);
|
||||||
|
}
|
||||||
|
getXStream().marshal(object, out);
|
||||||
|
return buffer.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
|
||||||
|
ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
|
||||||
|
Object obj = getXStream().unmarshal(in);
|
||||||
|
objMsg.setObject((Serializable) obj);
|
||||||
|
return objMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
|
||||||
|
ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
|
||||||
|
Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
|
||||||
|
for (String key : map.keySet()) {
|
||||||
|
mapMsg.setObject(key, map.get(key));
|
||||||
|
}
|
||||||
|
return mapMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// Properties
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
public XStream getXStream() {
|
||||||
|
if (xStream == null) {
|
||||||
|
xStream = createXStream();
|
||||||
|
}
|
||||||
|
return xStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setXStream(XStream xStream) {
|
||||||
|
this.xStream = xStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implementation methods
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
protected XStream createXStream() {
|
||||||
|
XStream xstream = null;
|
||||||
|
if (applicationContext != null) {
|
||||||
|
String[] names = applicationContext
|
||||||
|
.getBeanNamesForType(XStream.class);
|
||||||
|
for (int i = 0; i < names.length; i++) {
|
||||||
|
String name = names[i];
|
||||||
|
xstream = (XStream) applicationContext.getBean(name);
|
||||||
|
if (xstream != null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (xstream == null) {
|
||||||
|
System.out.println("is null");
|
||||||
|
xstream = new XStream();
|
||||||
|
}
|
||||||
|
return xstream;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setApplicationContext(ApplicationContext applicationContext)
|
||||||
|
throws BeansException {
|
||||||
|
this.applicationContext = applicationContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -57,6 +57,8 @@ import org.apache.activemq.util.IOExceptionSupport;
|
||||||
import org.apache.activemq.util.IdGenerator;
|
import org.apache.activemq.util.IdGenerator;
|
||||||
import org.apache.activemq.util.IntrospectionSupport;
|
import org.apache.activemq.util.IntrospectionSupport;
|
||||||
import org.apache.activemq.util.LongSequenceGenerator;
|
import org.apache.activemq.util.LongSequenceGenerator;
|
||||||
|
import org.springframework.context.ApplicationContext;
|
||||||
|
import org.springframework.context.ApplicationContextAware;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author <a href="http://hiramchirino.com">chirino</a>
|
* @author <a href="http://hiramchirino.com">chirino</a>
|
||||||
|
@ -86,10 +88,12 @@ public class ProtocolConverter {
|
||||||
private final AtomicBoolean connected = new AtomicBoolean(false);
|
private final AtomicBoolean connected = new AtomicBoolean(false);
|
||||||
private final FrameTranslator frameTranslator;
|
private final FrameTranslator frameTranslator;
|
||||||
private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
|
private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
|
||||||
|
private final ApplicationContext applicationContext;
|
||||||
|
|
||||||
public ProtocolConverter(StompTransportFilter stompTransportFilter, FrameTranslator translator) {
|
public ProtocolConverter(StompTransportFilter stompTransportFilter, FrameTranslator translator, ApplicationContext applicationContext) {
|
||||||
this.transportFilter = stompTransportFilter;
|
this.transportFilter = stompTransportFilter;
|
||||||
this.frameTranslator = translator;
|
this.frameTranslator = translator;
|
||||||
|
this.applicationContext = applicationContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected int generateCommandId() {
|
protected int generateCommandId() {
|
||||||
|
@ -140,6 +144,9 @@ public class ProtocolConverter {
|
||||||
if (header != null) {
|
if (header != null) {
|
||||||
translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER
|
translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER
|
||||||
.newInstance(header);
|
.newInstance(header);
|
||||||
|
if (translator instanceof ApplicationContextAware) {
|
||||||
|
((ApplicationContextAware)translator).setApplicationContext(applicationContext);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception ignore) {
|
} catch (Exception ignore) {
|
||||||
// if anything goes wrong use the default translator
|
// if anything goes wrong use the default translator
|
||||||
|
@ -554,9 +561,13 @@ public class ProtocolConverter {
|
||||||
return msg;
|
return msg;
|
||||||
}
|
}
|
||||||
|
|
||||||
public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException {
|
public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException {
|
||||||
|
if (ignoreTransformation == true) {
|
||||||
|
return frameTranslator.convertMessage(this, message);
|
||||||
|
} else {
|
||||||
return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message);
|
return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public StompTransportFilter getTransportFilter() {
|
public StompTransportFilter getTransportFilter() {
|
||||||
return transportFilter;
|
return transportFilter;
|
||||||
|
|
|
@ -49,6 +49,7 @@ public interface Stomp {
|
||||||
String TRANSACTION = "transaction";
|
String TRANSACTION = "transaction";
|
||||||
String CONTENT_LENGTH = "content-length";
|
String CONTENT_LENGTH = "content-length";
|
||||||
String TRANSFORMATION = "transformation";
|
String TRANSFORMATION = "transformation";
|
||||||
|
String TRANSFORMATION_ERROR = "transformation-error";
|
||||||
|
|
||||||
public interface Response {
|
public interface Response {
|
||||||
String RECEIPT_ID = "receipt-id";
|
String RECEIPT_ID = "receipt-id";
|
||||||
|
@ -114,4 +115,16 @@ public interface Stomp {
|
||||||
String MESSAGE_ID = "message-id";
|
String MESSAGE_ID = "message-id";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public enum Transformations {
|
||||||
|
JMS_BYTE, JMS_OBJECT_XML, JMS_OBJECT_JSON, JMS_MAP_XML, JMS_MAP_JSON;
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return name().replaceAll("_", "-").toLowerCase();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Transformations getValue(String value) {
|
||||||
|
return valueOf(value.replaceAll("-", "_").toUpperCase());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,25 +18,38 @@ package org.apache.activemq.transport.stomp;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.BrokerServiceAware;
|
||||||
import org.apache.activemq.transport.Transport;
|
import org.apache.activemq.transport.Transport;
|
||||||
import org.apache.activemq.transport.tcp.SslTransportFactory;
|
import org.apache.activemq.transport.tcp.SslTransportFactory;
|
||||||
import org.apache.activemq.util.IntrospectionSupport;
|
import org.apache.activemq.util.IntrospectionSupport;
|
||||||
import org.apache.activemq.wireformat.WireFormat;
|
import org.apache.activemq.wireformat.WireFormat;
|
||||||
|
import org.apache.activemq.xbean.XBeanBrokerService;
|
||||||
|
import org.springframework.context.ApplicationContext;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A <a href="http://stomp.codehaus.org/">STOMP</a> over SSL transport factory
|
* A <a href="http://stomp.codehaus.org/">STOMP</a> over SSL transport factory
|
||||||
*
|
*
|
||||||
* @version $Revision$
|
* @version $Revision$
|
||||||
*/
|
*/
|
||||||
public class StompSslTransportFactory extends SslTransportFactory {
|
public class StompSslTransportFactory extends SslTransportFactory implements BrokerServiceAware {
|
||||||
|
|
||||||
|
private ApplicationContext applicationContext = null;
|
||||||
|
|
||||||
protected String getDefaultWireFormatType() {
|
protected String getDefaultWireFormatType() {
|
||||||
return "stomp";
|
return "stomp";
|
||||||
}
|
}
|
||||||
|
|
||||||
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
|
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
|
||||||
transport = new StompTransportFilter(transport, new LegacyFrameTranslator());
|
transport = new StompTransportFilter(transport, new LegacyFrameTranslator(), applicationContext);
|
||||||
IntrospectionSupport.setProperties(transport, options);
|
IntrospectionSupport.setProperties(transport, options);
|
||||||
return super.compositeConfigure(transport, format, options);
|
return super.compositeConfigure(transport, format, options);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setBrokerService(BrokerService brokerService) {
|
||||||
|
if (brokerService instanceof XBeanBrokerService) {
|
||||||
|
this.applicationContext = ((XBeanBrokerService)brokerService).getApplicationContext();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,11 +70,18 @@ public class StompSubscription {
|
||||||
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
|
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
|
||||||
protocolConverter.getTransportFilter().sendToActiveMQ(ack);
|
protocolConverter.getTransportFilter().sendToActiveMQ(ack);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean ignoreTransformation = false;
|
||||||
|
|
||||||
if (transformation != null) {
|
if (transformation != null) {
|
||||||
message.setReadOnlyProperties(false);
|
message.setReadOnlyProperties(false);
|
||||||
message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
|
message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
|
||||||
|
} else {
|
||||||
|
if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
|
||||||
|
ignoreTransformation = true;
|
||||||
}
|
}
|
||||||
StompFrame command = protocolConverter.convertMessage(message);
|
}
|
||||||
|
StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation);
|
||||||
|
|
||||||
command.setAction(Stomp.Responses.MESSAGE);
|
command.setAction(Stomp.Responses.MESSAGE);
|
||||||
if (subscriptionId != null) {
|
if (subscriptionId != null) {
|
||||||
|
|
|
@ -18,24 +18,30 @@ package org.apache.activemq.transport.stomp;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.BrokerServiceAware;
|
||||||
import org.apache.activemq.transport.Transport;
|
import org.apache.activemq.transport.Transport;
|
||||||
import org.apache.activemq.transport.tcp.TcpTransportFactory;
|
import org.apache.activemq.transport.tcp.TcpTransportFactory;
|
||||||
import org.apache.activemq.util.IntrospectionSupport;
|
import org.apache.activemq.util.IntrospectionSupport;
|
||||||
import org.apache.activemq.wireformat.WireFormat;
|
import org.apache.activemq.wireformat.WireFormat;
|
||||||
|
import org.apache.activemq.xbean.XBeanBrokerService;
|
||||||
|
import org.springframework.context.ApplicationContext;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
|
* A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
|
||||||
*
|
*
|
||||||
* @version $Revision: 1.1.1.1 $
|
* @version $Revision: 1.1.1.1 $
|
||||||
*/
|
*/
|
||||||
public class StompTransportFactory extends TcpTransportFactory {
|
public class StompTransportFactory extends TcpTransportFactory implements BrokerServiceAware {
|
||||||
|
|
||||||
|
private ApplicationContext applicationContext = null;
|
||||||
|
|
||||||
protected String getDefaultWireFormatType() {
|
protected String getDefaultWireFormatType() {
|
||||||
return "stomp";
|
return "stomp";
|
||||||
}
|
}
|
||||||
|
|
||||||
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
|
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
|
||||||
transport = new StompTransportFilter(transport, new LegacyFrameTranslator());
|
transport = new StompTransportFilter(transport, new LegacyFrameTranslator(), applicationContext);
|
||||||
IntrospectionSupport.setProperties(transport, options);
|
IntrospectionSupport.setProperties(transport, options);
|
||||||
return super.compositeConfigure(transport, format, options);
|
return super.compositeConfigure(transport, format, options);
|
||||||
}
|
}
|
||||||
|
@ -45,4 +51,10 @@ public class StompTransportFactory extends TcpTransportFactory {
|
||||||
// packets
|
// packets
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setBrokerService(BrokerService brokerService) {
|
||||||
|
if (brokerService instanceof XBeanBrokerService) {
|
||||||
|
this.applicationContext = ((XBeanBrokerService)brokerService).getApplicationContext();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.activemq.transport.TransportFilter;
|
||||||
import org.apache.activemq.util.IOExceptionSupport;
|
import org.apache.activemq.util.IOExceptionSupport;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.springframework.context.ApplicationContext;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The StompTransportFilter normally sits on top of a TcpTransport that has been
|
* The StompTransportFilter normally sits on top of a TcpTransport that has been
|
||||||
|
@ -46,10 +47,10 @@ public class StompTransportFilter extends TransportFilter {
|
||||||
|
|
||||||
private boolean trace;
|
private boolean trace;
|
||||||
|
|
||||||
public StompTransportFilter(Transport next, FrameTranslator translator) {
|
public StompTransportFilter(Transport next, FrameTranslator translator, ApplicationContext applicationContext) {
|
||||||
super(next);
|
super(next);
|
||||||
this.frameTranslator = translator;
|
this.frameTranslator = translator;
|
||||||
this.protocolConverter = new ProtocolConverter(this, translator);
|
this.protocolConverter = new ProtocolConverter(this, translator, applicationContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void oneway(Object o) throws IOException {
|
public void oneway(Object o) throws IOException {
|
||||||
|
|
|
@ -1,138 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.io.StringReader;
|
|
||||||
import java.io.StringWriter;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
|
||||||
import javax.jms.ObjectMessage;
|
|
||||||
import javax.jms.Session;
|
|
||||||
import javax.jms.TextMessage;
|
|
||||||
|
|
||||||
import org.apache.activemq.command.ActiveMQBytesMessage;
|
|
||||||
import org.apache.activemq.command.ActiveMQMessage;
|
|
||||||
import org.apache.activemq.command.ActiveMQObjectMessage;
|
|
||||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
|
||||||
|
|
||||||
import com.thoughtworks.xstream.XStream;
|
|
||||||
import com.thoughtworks.xstream.io.HierarchicalStreamReader;
|
|
||||||
import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
|
|
||||||
import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
|
|
||||||
import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
|
|
||||||
import com.thoughtworks.xstream.io.xml.XppReader;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Frame translator implementation that uses XStream to convert messages to and from XML and JSON
|
|
||||||
* @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
|
|
||||||
*/
|
|
||||||
public class XStreamFrameTranslator extends LegacyFrameTranslator {
|
|
||||||
|
|
||||||
XStream xStream = new XStream();
|
|
||||||
|
|
||||||
public ActiveMQMessage convertFrame(ProtocolConverter converter,
|
|
||||||
StompFrame command) throws JMSException, ProtocolException {
|
|
||||||
Map headers = command.getHeaders();
|
|
||||||
ActiveMQMessage msg;
|
|
||||||
if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
|
|
||||||
msg = super.convertFrame(converter, command);
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
|
|
||||||
Object obj = unmarshall(new String(command.getContent(), "UTF-8"), (String)headers.get(Stomp.Headers.TRANSFORMATION));
|
|
||||||
objMsg.setObject((Serializable)obj);
|
|
||||||
msg = objMsg;
|
|
||||||
} catch (Throwable e) {
|
|
||||||
msg = super.convertFrame(converter, command);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
|
|
||||||
return msg;
|
|
||||||
}
|
|
||||||
|
|
||||||
public StompFrame convertMessage(ProtocolConverter converter,
|
|
||||||
ActiveMQMessage message) throws IOException, JMSException {
|
|
||||||
if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
|
|
||||||
StompFrame command = new StompFrame();
|
|
||||||
command.setAction(Stomp.Responses.MESSAGE);
|
|
||||||
Map<String, String> headers = new HashMap<String, String>(25);
|
|
||||||
command.setHeaders(headers);
|
|
||||||
|
|
||||||
FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this);
|
|
||||||
ActiveMQObjectMessage msg = (ActiveMQObjectMessage)message.copy();
|
|
||||||
command.setContent(marshall(msg.getObject(), headers.get(Stomp.Headers.TRANSFORMATION)).getBytes("UTF-8"));
|
|
||||||
return command;
|
|
||||||
|
|
||||||
} else {
|
|
||||||
return super.convertMessage(converter, message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Marshalls the Object to a string using XML or JSON
|
|
||||||
* encoding
|
|
||||||
*/
|
|
||||||
protected String marshall(Serializable object, String transformation) throws JMSException {
|
|
||||||
StringWriter buffer = new StringWriter();
|
|
||||||
HierarchicalStreamWriter out;
|
|
||||||
if (transformation.equalsIgnoreCase("jms-json")) {
|
|
||||||
out = new JettisonMappedXmlDriver().createWriter(buffer);
|
|
||||||
} else {
|
|
||||||
out = new PrettyPrintWriter(buffer);
|
|
||||||
}
|
|
||||||
getXStream().marshal(object, out);
|
|
||||||
return buffer.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Unmarshalls the XML or JSON encoded message to an
|
|
||||||
* Object
|
|
||||||
*/
|
|
||||||
protected Object unmarshall(String text, String transformation) {
|
|
||||||
HierarchicalStreamReader in;
|
|
||||||
if (transformation.equalsIgnoreCase("jms-json")) {
|
|
||||||
in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
|
|
||||||
} else {
|
|
||||||
in = new XppReader(new StringReader(text));
|
|
||||||
}
|
|
||||||
return getXStream().unmarshal(in);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Properties
|
|
||||||
// -------------------------------------------------------------------------
|
|
||||||
public XStream getXStream() {
|
|
||||||
if (xStream == null) {
|
|
||||||
xStream = createXStream();
|
|
||||||
}
|
|
||||||
return xStream;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setXStream(XStream xStream) {
|
|
||||||
this.xStream = xStream;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implementation methods
|
|
||||||
// -------------------------------------------------------------------------
|
|
||||||
protected XStream createXStream() {
|
|
||||||
return new XStream();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -0,0 +1,123 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.util;
|
||||||
|
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.springframework.beans.factory.FactoryBean;
|
||||||
|
import org.springframework.beans.propertyeditors.ClassEditor;
|
||||||
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
|
import com.thoughtworks.xstream.XStream;
|
||||||
|
import com.thoughtworks.xstream.annotations.Annotations;
|
||||||
|
import com.thoughtworks.xstream.converters.Converter;
|
||||||
|
import com.thoughtworks.xstream.converters.ConverterMatcher;
|
||||||
|
import com.thoughtworks.xstream.converters.SingleValueConverter;
|
||||||
|
|
||||||
|
public class XStreamFactoryBean implements FactoryBean {
|
||||||
|
|
||||||
|
XStream xstream = new XStream();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the <code>Converters</code> or <code>SingleValueConverters</code> to be registered with the
|
||||||
|
* <code>XStream</code> instance.
|
||||||
|
*
|
||||||
|
* @see Converter
|
||||||
|
* @see SingleValueConverter
|
||||||
|
*/
|
||||||
|
public void setConverters(ConverterMatcher[] converters) {
|
||||||
|
for (int i = 0; i < converters.length; i++) {
|
||||||
|
if (converters[i] instanceof Converter) {
|
||||||
|
xstream.registerConverter((Converter) converters[i], i);
|
||||||
|
}
|
||||||
|
else if (converters[i] instanceof SingleValueConverter) {
|
||||||
|
xstream.registerConverter((SingleValueConverter) converters[i], i);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
throw new IllegalArgumentException("Invalid ConverterMatcher [" + converters[i] + "]");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set a alias/type map, consisting of string aliases mapped to <code>Class</code> instances (or Strings to be
|
||||||
|
* converted to <code>Class</code> instances).
|
||||||
|
*
|
||||||
|
* @see org.springframework.beans.propertyeditors.ClassEditor
|
||||||
|
*/
|
||||||
|
public void setAliases(Map aliases) {
|
||||||
|
for (Iterator iterator = aliases.entrySet().iterator(); iterator.hasNext();) {
|
||||||
|
Map.Entry entry = (Map.Entry) iterator.next();
|
||||||
|
// Check whether we need to convert from String to Class.
|
||||||
|
Class type;
|
||||||
|
if (entry.getValue() instanceof Class) {
|
||||||
|
type = (Class) entry.getValue();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
ClassEditor editor = new ClassEditor();
|
||||||
|
editor.setAsText(String.valueOf(entry.getValue()));
|
||||||
|
type = (Class) editor.getValue();
|
||||||
|
}
|
||||||
|
xstream.alias((String) entry.getKey(), type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the XStream mode.
|
||||||
|
*
|
||||||
|
* @see XStream#XPATH_REFERENCES
|
||||||
|
* @see XStream#ID_REFERENCES
|
||||||
|
* @see XStream#NO_REFERENCES
|
||||||
|
*/
|
||||||
|
public void setMode(int mode) {
|
||||||
|
xstream.setMode(mode);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the classes, for which mappings will be read from class-level JDK 1.5+ annotation metadata.
|
||||||
|
*
|
||||||
|
* @see Annotations#configureAliases(XStream, Class[])
|
||||||
|
*/
|
||||||
|
public void setAnnotatedClass(Class<?> annotatedClass) {
|
||||||
|
Assert.notNull(annotatedClass, "'annotatedClass' must not be null");
|
||||||
|
Annotations.configureAliases(xstream, annotatedClass);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets annotated classes, for which aliases will be read from class-level JDK 1.5+ annotation metadata.
|
||||||
|
*
|
||||||
|
* @see Annotations#configureAliases(XStream, Class[])
|
||||||
|
*/
|
||||||
|
public void setAnnotatedClasses(Class<?>[] annotatedClasses) {
|
||||||
|
Assert.notEmpty(annotatedClasses, "'annotatedClasses' must not be empty");
|
||||||
|
Annotations.configureAliases(xstream, annotatedClasses);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Object getObject() throws Exception {
|
||||||
|
return xstream;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Class getObjectType() {
|
||||||
|
return XStream.class;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isSingleton() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -29,6 +29,7 @@ import org.apache.xbean.spring.context.ResourceXmlApplicationContext;
|
||||||
import org.apache.xbean.spring.context.impl.URIEditor;
|
import org.apache.xbean.spring.context.impl.URIEditor;
|
||||||
import org.springframework.beans.BeansException;
|
import org.springframework.beans.BeansException;
|
||||||
import org.springframework.context.ApplicationContext;
|
import org.springframework.context.ApplicationContext;
|
||||||
|
import org.springframework.context.ApplicationContextAware;
|
||||||
import org.springframework.core.io.ClassPathResource;
|
import org.springframework.core.io.ClassPathResource;
|
||||||
import org.springframework.core.io.FileSystemResource;
|
import org.springframework.core.io.FileSystemResource;
|
||||||
import org.springframework.core.io.Resource;
|
import org.springframework.core.io.Resource;
|
||||||
|
@ -71,6 +72,10 @@ public class XBeanBrokerFactory implements BrokerFactoryHandler {
|
||||||
throw new IllegalArgumentException("The configuration has no BrokerService instance for resource: " + config);
|
throw new IllegalArgumentException("The configuration has no BrokerService instance for resource: " + config);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (broker instanceof ApplicationContextAware) {
|
||||||
|
((ApplicationContextAware)broker).setApplicationContext(context);
|
||||||
|
}
|
||||||
|
|
||||||
// TODO warning resources from the context may not be closed down!
|
// TODO warning resources from the context may not be closed down!
|
||||||
|
|
||||||
return broker;
|
return broker;
|
||||||
|
|
|
@ -17,8 +17,11 @@
|
||||||
package org.apache.activemq.xbean;
|
package org.apache.activemq.xbean;
|
||||||
|
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.springframework.beans.BeansException;
|
||||||
import org.springframework.beans.factory.DisposableBean;
|
import org.springframework.beans.factory.DisposableBean;
|
||||||
import org.springframework.beans.factory.InitializingBean;
|
import org.springframework.beans.factory.InitializingBean;
|
||||||
|
import org.springframework.context.ApplicationContext;
|
||||||
|
import org.springframework.context.ApplicationContextAware;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An ActiveMQ Message Broker. It consists of a number of transport
|
* An ActiveMQ Message Broker. It consists of a number of transport
|
||||||
|
@ -34,9 +37,10 @@ import org.springframework.beans.factory.InitializingBean;
|
||||||
* {code}
|
* {code}
|
||||||
* @version $Revision: 1.1 $
|
* @version $Revision: 1.1 $
|
||||||
*/
|
*/
|
||||||
public class XBeanBrokerService extends BrokerService implements InitializingBean, DisposableBean {
|
public class XBeanBrokerService extends BrokerService implements InitializingBean, DisposableBean, ApplicationContextAware {
|
||||||
|
|
||||||
private boolean start = true;
|
private boolean start = true;
|
||||||
|
private ApplicationContext applicationContext = null;
|
||||||
|
|
||||||
public XBeanBrokerService() {
|
public XBeanBrokerService() {
|
||||||
}
|
}
|
||||||
|
@ -63,4 +67,15 @@ public class XBeanBrokerService extends BrokerService implements InitializingBea
|
||||||
public void setStart(boolean start) {
|
public void setStart(boolean start) {
|
||||||
this.start = start;
|
this.start = start;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setApplicationContext(ApplicationContext applicationContext)
|
||||||
|
throws BeansException {
|
||||||
|
this.applicationContext = applicationContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ApplicationContext getApplicationContext() {
|
||||||
|
return applicationContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,17 @@
|
||||||
|
## ---------------------------------------------------------------------------
|
||||||
|
## Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
## contributor license agreements. See the NOTICE file distributed with
|
||||||
|
## this work for additional information regarding copyright ownership.
|
||||||
|
## The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
## (the "License"); you may not use this file except in compliance with
|
||||||
|
## the License. You may obtain a copy of the License at
|
||||||
|
##
|
||||||
|
## http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
##
|
||||||
|
## Unless required by applicable law or agreed to in writing, software
|
||||||
|
## distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
## See the License for the specific language governing permissions and
|
||||||
|
## limitations under the License.
|
||||||
|
## ---------------------------------------------------------------------------
|
||||||
|
class=org.apache.activemq.transport.stomp.JmsFrameTranslator
|
|
@ -1,17 +0,0 @@
|
||||||
## ---------------------------------------------------------------------------
|
|
||||||
## Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
## contributor license agreements. See the NOTICE file distributed with
|
|
||||||
## this work for additional information regarding copyright ownership.
|
|
||||||
## The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
## (the "License"); you may not use this file except in compliance with
|
|
||||||
## the License. You may obtain a copy of the License at
|
|
||||||
##
|
|
||||||
## http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
##
|
|
||||||
## Unless required by applicable law or agreed to in writing, software
|
|
||||||
## distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
## See the License for the specific language governing permissions and
|
|
||||||
## limitations under the License.
|
|
||||||
## ---------------------------------------------------------------------------
|
|
||||||
class=org.apache.activemq.transport.stomp.XStreamFrameTranslator
|
|
|
@ -0,0 +1,17 @@
|
||||||
|
## ---------------------------------------------------------------------------
|
||||||
|
## Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
## contributor license agreements. See the NOTICE file distributed with
|
||||||
|
## this work for additional information regarding copyright ownership.
|
||||||
|
## The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
## (the "License"); you may not use this file except in compliance with
|
||||||
|
## the License. You may obtain a copy of the License at
|
||||||
|
##
|
||||||
|
## http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
##
|
||||||
|
## Unless required by applicable law or agreed to in writing, software
|
||||||
|
## distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
## See the License for the specific language governing permissions and
|
||||||
|
## limitations under the License.
|
||||||
|
## ---------------------------------------------------------------------------
|
||||||
|
class=org.apache.activemq.transport.stomp.JmsFrameTranslator
|
|
@ -0,0 +1,17 @@
|
||||||
|
## ---------------------------------------------------------------------------
|
||||||
|
## Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
## contributor license agreements. See the NOTICE file distributed with
|
||||||
|
## this work for additional information regarding copyright ownership.
|
||||||
|
## The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
## (the "License"); you may not use this file except in compliance with
|
||||||
|
## the License. You may obtain a copy of the License at
|
||||||
|
##
|
||||||
|
## http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
##
|
||||||
|
## Unless required by applicable law or agreed to in writing, software
|
||||||
|
## distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
## See the License for the specific language governing permissions and
|
||||||
|
## limitations under the License.
|
||||||
|
## ---------------------------------------------------------------------------
|
||||||
|
class=org.apache.activemq.transport.stomp.JmsFrameTranslator
|
|
@ -0,0 +1,17 @@
|
||||||
|
## ---------------------------------------------------------------------------
|
||||||
|
## Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
## contributor license agreements. See the NOTICE file distributed with
|
||||||
|
## this work for additional information regarding copyright ownership.
|
||||||
|
## The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
## (the "License"); you may not use this file except in compliance with
|
||||||
|
## the License. You may obtain a copy of the License at
|
||||||
|
##
|
||||||
|
## http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
##
|
||||||
|
## Unless required by applicable law or agreed to in writing, software
|
||||||
|
## distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
## See the License for the specific language governing permissions and
|
||||||
|
## limitations under the License.
|
||||||
|
## ---------------------------------------------------------------------------
|
||||||
|
class=org.apache.activemq.transport.stomp.JmsFrameTranslator
|
|
@ -0,0 +1,17 @@
|
||||||
|
## ---------------------------------------------------------------------------
|
||||||
|
## Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
## contributor license agreements. See the NOTICE file distributed with
|
||||||
|
## this work for additional information regarding copyright ownership.
|
||||||
|
## The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
## (the "License"); you may not use this file except in compliance with
|
||||||
|
## the License. You may obtain a copy of the License at
|
||||||
|
##
|
||||||
|
## http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
##
|
||||||
|
## Unless required by applicable law or agreed to in writing, software
|
||||||
|
## distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
## See the License for the specific language governing permissions and
|
||||||
|
## limitations under the License.
|
||||||
|
## ---------------------------------------------------------------------------
|
||||||
|
class=org.apache.activemq.transport.stomp.JmsFrameTranslator
|
|
@ -1,17 +0,0 @@
|
||||||
## ---------------------------------------------------------------------------
|
|
||||||
## Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
## contributor license agreements. See the NOTICE file distributed with
|
|
||||||
## this work for additional information regarding copyright ownership.
|
|
||||||
## The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
## (the "License"); you may not use this file except in compliance with
|
|
||||||
## the License. You may obtain a copy of the License at
|
|
||||||
##
|
|
||||||
## http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
##
|
|
||||||
## Unless required by applicable law or agreed to in writing, software
|
|
||||||
## distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
## See the License for the specific language governing permissions and
|
|
||||||
## limitations under the License.
|
|
||||||
## ---------------------------------------------------------------------------
|
|
||||||
class=org.apache.activemq.transport.stomp.XStreamFrameTranslator
|
|
|
@ -18,8 +18,13 @@ package org.apache.activemq.transport.stomp;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
import com.thoughtworks.xstream.annotations.XStreamAlias;
|
||||||
|
|
||||||
|
@XStreamAlias("pojo")
|
||||||
public class SamplePojo implements Serializable {
|
public class SamplePojo implements Serializable {
|
||||||
|
@XStreamAlias("name")
|
||||||
private String name;
|
private String name;
|
||||||
|
@XStreamAlias("city")
|
||||||
private String city;
|
private String city;
|
||||||
|
|
||||||
public SamplePojo() {
|
public SamplePojo() {
|
||||||
|
|
|
@ -28,6 +28,8 @@ import java.util.regex.Pattern;
|
||||||
import javax.jms.BytesMessage;
|
import javax.jms.BytesMessage;
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.MapMessage;
|
||||||
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.ObjectMessage;
|
import javax.jms.ObjectMessage;
|
||||||
|
@ -54,16 +56,34 @@ public class StompTest extends CombinationTestSupport {
|
||||||
private Connection connection;
|
private Connection connection;
|
||||||
private Session session;
|
private Session session;
|
||||||
private ActiveMQQueue queue;
|
private ActiveMQQueue queue;
|
||||||
private String xmlText = "<org.apache.activemq.transport.stomp.SamplePojo>\n"
|
private String xmlObject = "<pojo>\n"
|
||||||
+ " <name>Dejan</name>\n"
|
+ " <name>Dejan</name>\n"
|
||||||
+ " <city>Belgrade</city>\n"
|
+ " <city>Belgrade</city>\n"
|
||||||
+ "</org.apache.activemq.transport.stomp.SamplePojo>";
|
+ "</pojo>";
|
||||||
|
|
||||||
private String jsonText = "{\"org.apache.activemq.transport.stomp.SamplePojo\":{"
|
private String xmlMap = "<map>\n"
|
||||||
|
+ " <entry>\n"
|
||||||
|
+ " <string>name</string>\n"
|
||||||
|
+ " <string>Dejan</string>\n"
|
||||||
|
+ " </entry>\n"
|
||||||
|
+ " <entry>\n"
|
||||||
|
+ " <string>city</string>\n"
|
||||||
|
+ " <string>Belgrade</string>\n"
|
||||||
|
+ " </entry>\n"
|
||||||
|
+ "</map>\n";
|
||||||
|
|
||||||
|
private String jsonObject = "{\"pojo\":{"
|
||||||
+ "\"name\":\"Dejan\","
|
+ "\"name\":\"Dejan\","
|
||||||
+ "\"city\":\"Belgrade\""
|
+ "\"city\":\"Belgrade\""
|
||||||
+ "}}";
|
+ "}}";
|
||||||
|
|
||||||
|
private String jsonMap = "{\"map\":{"
|
||||||
|
+ "\"entry\":["
|
||||||
|
+ "{\"string\":[\"name\",\"Dejan\"]},"
|
||||||
|
+ "{\"string\":[\"city\",\"Belgrade\"]}"
|
||||||
|
+ "]"
|
||||||
|
+ "}}";
|
||||||
|
|
||||||
protected void setUp() throws Exception {
|
protected void setUp() throws Exception {
|
||||||
broker = BrokerFactory.createBroker(new URI(confUri));
|
broker = BrokerFactory.createBroker(new URI(confUri));
|
||||||
broker.start();
|
broker.start();
|
||||||
|
@ -301,7 +321,6 @@ public class StompTest extends CombinationTestSupport {
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
assertTrue(frame.startsWith("MESSAGE"));
|
assertTrue(frame.startsWith("MESSAGE"));
|
||||||
|
|
||||||
// System.out.println("out: "+frame);
|
|
||||||
|
|
||||||
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
@ -590,16 +609,17 @@ public class StompTest extends CombinationTestSupport {
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
assertTrue(frame.startsWith("CONNECTED"));
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:jms-xml" + "\n\n" + "Hello World" + Stomp.NULL;
|
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + "Hello World" + Stomp.NULL;
|
||||||
|
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
TextMessage message = (TextMessage)consumer.receive(1000);
|
TextMessage message = (TextMessage)consumer.receive(1000);
|
||||||
assertNotNull(message);
|
assertNotNull(message);
|
||||||
|
assertNotNull(message.getStringProperty(Stomp.Headers.TRANSFORMATION_ERROR));
|
||||||
assertEquals("Hello World", message.getText());
|
assertEquals("Hello World", message.getText());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testTransformationSendXML() throws Exception {
|
public void testTransformationSendXMLObject() throws Exception {
|
||||||
MessageConsumer consumer = session.createConsumer(queue);
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
|
@ -608,7 +628,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
assertTrue(frame.startsWith("CONNECTED"));
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:jms-xml" + "\n\n" + xmlText + Stomp.NULL;
|
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + xmlObject + Stomp.NULL;
|
||||||
|
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
@ -618,7 +638,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
assertEquals("Dejan", object.getName());
|
assertEquals("Dejan", object.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testTransformationSendJSON() throws Exception {
|
public void testTransformationSendJSONObject() throws Exception {
|
||||||
MessageConsumer consumer = session.createConsumer(queue);
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
|
@ -627,7 +647,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
assertTrue(frame.startsWith("CONNECTED"));
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:jms-json" + "\n\n" + jsonText + Stomp.NULL;
|
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + jsonObject + Stomp.NULL;
|
||||||
|
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
@ -649,18 +669,18 @@ public class StompTest extends CombinationTestSupport {
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
assertTrue(frame.startsWith("CONNECTED"));
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:jms-xml" + "\n\n" + Stomp.NULL;
|
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
|
|
||||||
assertTrue(frame.trim().endsWith(xmlText));
|
assertTrue(frame.trim().endsWith(xmlObject));
|
||||||
|
|
||||||
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testTransformationReceiveJSON() throws Exception {
|
public void testTransformationReceiveJSONObject() throws Exception {
|
||||||
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
|
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
|
||||||
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
|
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
|
@ -671,22 +691,67 @@ public class StompTest extends CombinationTestSupport {
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
assertTrue(frame.startsWith("CONNECTED"));
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:jms-json" + "\n\n" + Stomp.NULL;
|
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
|
|
||||||
assertTrue(frame.trim().endsWith(jsonText));
|
assertTrue(frame.trim().endsWith(jsonObject));
|
||||||
|
|
||||||
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testTransformationReceiveXML() throws Exception {
|
public void testTransformationReceiveXMLObject() throws Exception {
|
||||||
|
|
||||||
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
|
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
|
||||||
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
|
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
|
||||||
message.setStringProperty("transformation", "jms-xml");
|
producer.send(message);
|
||||||
|
|
||||||
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
frame = stompConnection.receiveFrame();
|
||||||
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
frame = stompConnection.receiveFrame();
|
||||||
|
|
||||||
|
assertTrue(frame.trim().endsWith(xmlObject));
|
||||||
|
|
||||||
|
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testTransformationNotOverrideSubscription() throws Exception {
|
||||||
|
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
|
||||||
|
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
|
||||||
|
message.setStringProperty("transformation", Stomp.Transformations.JMS_OBJECT_XML.toString());
|
||||||
|
producer.send(message);
|
||||||
|
|
||||||
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
frame = stompConnection.receiveFrame();
|
||||||
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
frame = stompConnection.receiveFrame();
|
||||||
|
|
||||||
|
assertTrue(frame.trim().endsWith(jsonObject));
|
||||||
|
|
||||||
|
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testTransformationIgnoreTransformation() throws Exception {
|
||||||
|
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
|
||||||
|
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
|
||||||
|
message.setStringProperty("transformation", Stomp.Transformations.JMS_OBJECT_XML.toString());
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
|
|
||||||
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
|
@ -700,16 +765,54 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
|
|
||||||
assertTrue(frame.trim().endsWith(xmlText));
|
assertTrue(frame.endsWith("\n\n"));
|
||||||
|
|
||||||
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testTransformationNotOverrideSubscription() throws Exception {
|
public void testTransformationSendXMLMap() throws Exception {
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
frame = stompConnection.receiveFrame();
|
||||||
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_MAP_XML + "\n\n" + xmlMap + Stomp.NULL;
|
||||||
|
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
MapMessage message = (MapMessage) consumer.receive(1000);
|
||||||
|
assertNotNull(message);
|
||||||
|
assertEquals(message.getString("name"), "Dejan");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testTransformationSendJSONMap() throws Exception {
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
frame = stompConnection.receiveFrame();
|
||||||
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_MAP_JSON + "\n\n" + jsonMap + Stomp.NULL;
|
||||||
|
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
MapMessage message = (MapMessage) consumer.receive(1000);
|
||||||
|
assertNotNull(message);
|
||||||
|
assertEquals(message.getString("name"), "Dejan");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testTransformationReceiveXMLMap() throws Exception {
|
||||||
|
|
||||||
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
|
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
|
||||||
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
|
MapMessage message = session.createMapMessage();
|
||||||
message.setStringProperty("transformation", "jms-xml");
|
message.setString("name", "Dejan");
|
||||||
|
message.setString("city", "Belgrade");
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
|
|
||||||
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
|
@ -718,12 +821,37 @@ public class StompTest extends CombinationTestSupport {
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
assertTrue(frame.startsWith("CONNECTED"));
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:jms-json" + "\n\n" + Stomp.NULL;
|
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n" + "transformation:" + Stomp.Transformations.JMS_MAP_XML + "\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
|
|
||||||
assertTrue(frame.trim().endsWith(jsonText));
|
assertTrue(frame.trim().endsWith(xmlMap.trim()));
|
||||||
|
|
||||||
|
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testTransformationReceiveJSONMap() throws Exception {
|
||||||
|
|
||||||
|
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
|
||||||
|
MapMessage message = session.createMapMessage();
|
||||||
|
message.setString("name", "Dejan");
|
||||||
|
message.setString("city", "Belgrade");
|
||||||
|
producer.send(message);
|
||||||
|
|
||||||
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
frame = stompConnection.receiveFrame();
|
||||||
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n" + "transformation:" + Stomp.Transformations.JMS_MAP_JSON + "\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
frame = stompConnection.receiveFrame();
|
||||||
|
|
||||||
|
assertTrue(frame.trim().endsWith(jsonMap.trim()));
|
||||||
|
|
||||||
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.xbean;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.BrokerFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.springframework.context.ApplicationContext;
|
||||||
|
import org.springframework.context.ApplicationContextAware;
|
||||||
|
|
||||||
|
|
||||||
|
public class ApplicationContextAwareTest extends MultipleTestsWithEmbeddedBrokerTest {
|
||||||
|
|
||||||
|
protected BrokerService createBroker() throws Exception {
|
||||||
|
return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/xbean/activemq.xml"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testContextAware() {
|
||||||
|
assertTrue(broker instanceof XBeanBrokerService);
|
||||||
|
assertTrue(broker instanceof ApplicationContextAware);
|
||||||
|
ApplicationContext context = ((XBeanBrokerService)broker).getApplicationContext();
|
||||||
|
assertTrue(context.containsBean("org.apache.activemq.xbean.XBeanBrokerService"));
|
||||||
|
}
|
||||||
|
}
|
|
@ -21,6 +21,10 @@
|
||||||
<beans>
|
<beans>
|
||||||
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
|
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
|
||||||
|
|
||||||
|
<bean class="org.apache.activemq.util.XStreamFactoryBean" name="xstream">
|
||||||
|
<property name="annotatedClass"><value>org.apache.activemq.transport.stomp.SamplePojo</value></property>
|
||||||
|
</bean>
|
||||||
|
|
||||||
<broker useJmx="false" persistent="false" xmlns="http://activemq.org/config/1.0" populateJMSXUserID="true">
|
<broker useJmx="false" persistent="false" xmlns="http://activemq.org/config/1.0" populateJMSXUserID="true">
|
||||||
|
|
||||||
<transportConnectors>
|
<transportConnectors>
|
||||||
|
|
Loading…
Reference in New Issue