mirror of https://github.com/apache/activemq.git
Applying patch at https://issues.apache.org/activemq/browse/AMQ-943
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@612544 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
634da7a88e
commit
2b5608f761
|
@ -108,6 +108,12 @@
|
|||
<artifactId>commons-pool</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.thoughtworks.xstream</groupId>
|
||||
<artifactId>xstream</artifactId>
|
||||
<version>1.2.2</version>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<!-- for XML parsing -->
|
||||
<dependency>
|
||||
|
@ -169,6 +175,18 @@
|
|||
<optional>false</optional>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.codehaus.jettison</groupId>
|
||||
<artifactId>jettison</artifactId>
|
||||
<version>1.0-RC1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>stax</groupId>
|
||||
<artifactId>stax-api</artifactId>
|
||||
<version>1.0.1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- testing camel helpers -->
|
||||
<dependency>
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.activemq.command.ShutdownInfo;
|
|||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.command.TransactionInfo;
|
||||
import org.apache.activemq.util.ByteArrayOutputStream;
|
||||
import org.apache.activemq.util.FactoryFinder;
|
||||
import org.apache.activemq.util.IOExceptionSupport;
|
||||
import org.apache.activemq.util.IdGenerator;
|
||||
import org.apache.activemq.util.IntrospectionSupport;
|
||||
|
@ -84,6 +85,7 @@ public class ProtocolConverter {
|
|||
private int lastCommandId;
|
||||
private final AtomicBoolean connected = new AtomicBoolean(false);
|
||||
private final FrameTranslator frameTranslator;
|
||||
private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
|
||||
|
||||
public ProtocolConverter(StompTransportFilter stompTransportFilter, FrameTranslator translator) {
|
||||
this.transportFilter = stompTransportFilter;
|
||||
|
@ -131,12 +133,26 @@ public class ProtocolConverter {
|
|||
protected void sendToStomp(StompFrame command) throws IOException {
|
||||
transportFilter.sendToStomp(command);
|
||||
}
|
||||
|
||||
protected FrameTranslator findTranslator(String header) {
|
||||
FrameTranslator translator = frameTranslator;
|
||||
try {
|
||||
if (header != null) {
|
||||
translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER
|
||||
.newInstance(header);
|
||||
}
|
||||
} catch (Exception ignore) {
|
||||
// if anything goes wrong use the default translator
|
||||
}
|
||||
|
||||
return translator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a stomp command
|
||||
*
|
||||
* @param command
|
||||
*/
|
||||
* Convert a stomp command
|
||||
*
|
||||
* @param command
|
||||
*/
|
||||
public void onStompCommad(StompFrame command) throws IOException, JMSException {
|
||||
try {
|
||||
|
||||
|
@ -340,12 +356,13 @@ public class ProtocolConverter {
|
|||
|
||||
protected void onStompSubscribe(StompFrame command) throws ProtocolException {
|
||||
checkConnected();
|
||||
FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION));
|
||||
Map<String, String> headers = command.getHeaders();
|
||||
|
||||
String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
|
||||
String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
|
||||
|
||||
ActiveMQDestination actualDest = frameTranslator.convertDestination(this, destination);
|
||||
ActiveMQDestination actualDest = translator.convertDestination(this, destination);
|
||||
ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
|
||||
ConsumerInfo consumerInfo = new ConsumerInfo(id);
|
||||
consumerInfo.setPrefetchSize(1000);
|
||||
|
@ -356,9 +373,9 @@ public class ProtocolConverter {
|
|||
|
||||
IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
|
||||
|
||||
consumerInfo.setDestination(frameTranslator.convertDestination(this, destination));
|
||||
consumerInfo.setDestination(translator.convertDestination(this, destination));
|
||||
|
||||
StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo);
|
||||
StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
|
||||
stompSubscription.setDestination(actualDest);
|
||||
|
||||
String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
|
||||
|
@ -380,7 +397,7 @@ public class ProtocolConverter {
|
|||
ActiveMQDestination destination = null;
|
||||
Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
|
||||
if (o != null) {
|
||||
destination = frameTranslator.convertDestination(this, (String)o);
|
||||
destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o);
|
||||
}
|
||||
|
||||
String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
|
||||
|
@ -533,12 +550,12 @@ public class ProtocolConverter {
|
|||
}
|
||||
|
||||
public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
|
||||
ActiveMQMessage msg = frameTranslator.convertFrame(this, command);
|
||||
ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command);
|
||||
return msg;
|
||||
}
|
||||
|
||||
public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException {
|
||||
return frameTranslator.convertMessage(this, message);
|
||||
return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message);
|
||||
}
|
||||
|
||||
public StompTransportFilter getTransportFilter() {
|
||||
|
|
|
@ -48,6 +48,7 @@ public interface Stomp {
|
|||
String RECEIPT_REQUESTED = "receipt";
|
||||
String TRANSACTION = "transaction";
|
||||
String CONTENT_LENGTH = "content-length";
|
||||
String TRANSFORMATION = "transformation";
|
||||
|
||||
public interface Response {
|
||||
String RECEIPT_ID = "receipt-id";
|
||||
|
|
|
@ -49,11 +49,13 @@ public class StompSubscription {
|
|||
|
||||
private String ackMode = AUTO_ACK;
|
||||
private ActiveMQDestination destination;
|
||||
private String transformation;
|
||||
|
||||
public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo) {
|
||||
public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
|
||||
this.protocolConverter = stompTransport;
|
||||
this.subscriptionId = subscriptionId;
|
||||
this.consumerInfo = consumerInfo;
|
||||
this.transformation = transformation;
|
||||
}
|
||||
|
||||
void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
|
||||
|
@ -68,7 +70,10 @@ public class StompSubscription {
|
|||
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
|
||||
protocolConverter.getTransportFilter().sendToActiveMQ(ack);
|
||||
}
|
||||
|
||||
if (transformation != null) {
|
||||
message.setReadOnlyProperties(false);
|
||||
message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
|
||||
}
|
||||
StompFrame command = protocolConverter.convertMessage(message);
|
||||
|
||||
command.setAction(Stomp.Responses.MESSAGE);
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
/**
|
||||
* The StompTransportFilter normally sits on top of a TcpTransport that has been
|
||||
* configured with the StompWireFormat and is used to convert STOMP commands to
|
||||
* ActiveMQ commands. All of the coversion work is done by delegating to the
|
||||
* ActiveMQ commands. All of the conversion work is done by delegating to the
|
||||
* ProtocolConverter.
|
||||
*
|
||||
* @author <a href="http://hiramchirino.com">chirino</a>
|
||||
|
|
|
@ -0,0 +1,138 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.stomp;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.io.StringReader;
|
||||
import java.io.StringWriter;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQBytesMessage;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.ActiveMQObjectMessage;
|
||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||
|
||||
import com.thoughtworks.xstream.XStream;
|
||||
import com.thoughtworks.xstream.io.HierarchicalStreamReader;
|
||||
import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
|
||||
import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
|
||||
import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
|
||||
import com.thoughtworks.xstream.io.xml.XppReader;
|
||||
|
||||
/**
|
||||
* Frame translator implementation that uses XStream to convert messages to and from XML and JSON
|
||||
* @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
|
||||
*/
|
||||
public class XStreamFrameTranslator extends LegacyFrameTranslator {
|
||||
|
||||
XStream xStream = new XStream();
|
||||
|
||||
public ActiveMQMessage convertFrame(ProtocolConverter converter,
|
||||
StompFrame command) throws JMSException, ProtocolException {
|
||||
Map headers = command.getHeaders();
|
||||
ActiveMQMessage msg;
|
||||
if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
|
||||
msg = super.convertFrame(converter, command);
|
||||
} else {
|
||||
try {
|
||||
ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
|
||||
Object obj = unmarshall(new String(command.getContent(), "UTF-8"), (String)headers.get(Stomp.Headers.TRANSFORMATION));
|
||||
objMsg.setObject((Serializable)obj);
|
||||
msg = objMsg;
|
||||
} catch (Throwable e) {
|
||||
msg = super.convertFrame(converter, command);
|
||||
}
|
||||
}
|
||||
FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
|
||||
return msg;
|
||||
}
|
||||
|
||||
public StompFrame convertMessage(ProtocolConverter converter,
|
||||
ActiveMQMessage message) throws IOException, JMSException {
|
||||
if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
|
||||
StompFrame command = new StompFrame();
|
||||
command.setAction(Stomp.Responses.MESSAGE);
|
||||
Map<String, String> headers = new HashMap<String, String>(25);
|
||||
command.setHeaders(headers);
|
||||
|
||||
FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this);
|
||||
ActiveMQObjectMessage msg = (ActiveMQObjectMessage)message.copy();
|
||||
command.setContent(marshall(msg.getObject(), headers.get(Stomp.Headers.TRANSFORMATION)).getBytes("UTF-8"));
|
||||
return command;
|
||||
|
||||
} else {
|
||||
return super.convertMessage(converter, message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Marshalls the Object to a string using XML or JSON
|
||||
* encoding
|
||||
*/
|
||||
protected String marshall(Serializable object, String transformation) throws JMSException {
|
||||
StringWriter buffer = new StringWriter();
|
||||
HierarchicalStreamWriter out;
|
||||
if (transformation.equalsIgnoreCase("jms-json")) {
|
||||
out = new JettisonMappedXmlDriver().createWriter(buffer);
|
||||
} else {
|
||||
out = new PrettyPrintWriter(buffer);
|
||||
}
|
||||
getXStream().marshal(object, out);
|
||||
return buffer.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Unmarshalls the XML or JSON encoded message to an
|
||||
* Object
|
||||
*/
|
||||
protected Object unmarshall(String text, String transformation) {
|
||||
HierarchicalStreamReader in;
|
||||
if (transformation.equalsIgnoreCase("jms-json")) {
|
||||
in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
|
||||
} else {
|
||||
in = new XppReader(new StringReader(text));
|
||||
}
|
||||
return getXStream().unmarshal(in);
|
||||
}
|
||||
|
||||
// Properties
|
||||
// -------------------------------------------------------------------------
|
||||
public XStream getXStream() {
|
||||
if (xStream == null) {
|
||||
xStream = createXStream();
|
||||
}
|
||||
return xStream;
|
||||
}
|
||||
|
||||
public void setXStream(XStream xStream) {
|
||||
this.xStream = xStream;
|
||||
}
|
||||
|
||||
// Implementation methods
|
||||
// -------------------------------------------------------------------------
|
||||
protected XStream createXStream() {
|
||||
return new XStream();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
## ---------------------------------------------------------------------------
|
||||
## Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
## contributor license agreements. See the NOTICE file distributed with
|
||||
## this work for additional information regarding copyright ownership.
|
||||
## The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
## (the "License"); you may not use this file except in compliance with
|
||||
## the License. You may obtain a copy of the License at
|
||||
##
|
||||
## http://www.apache.org/licenses/LICENSE-2.0
|
||||
##
|
||||
## Unless required by applicable law or agreed to in writing, software
|
||||
## distributed under the License is distributed on an "AS IS" BASIS,
|
||||
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
## See the License for the specific language governing permissions and
|
||||
## limitations under the License.
|
||||
## ---------------------------------------------------------------------------
|
||||
class=org.apache.activemq.transport.stomp.XStreamFrameTranslator
|
|
@ -0,0 +1,17 @@
|
|||
## ---------------------------------------------------------------------------
|
||||
## Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
## contributor license agreements. See the NOTICE file distributed with
|
||||
## this work for additional information regarding copyright ownership.
|
||||
## The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
## (the "License"); you may not use this file except in compliance with
|
||||
## the License. You may obtain a copy of the License at
|
||||
##
|
||||
## http://www.apache.org/licenses/LICENSE-2.0
|
||||
##
|
||||
## Unless required by applicable law or agreed to in writing, software
|
||||
## distributed under the License is distributed on an "AS IS" BASIS,
|
||||
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
## See the License for the specific language governing permissions and
|
||||
## limitations under the License.
|
||||
## ---------------------------------------------------------------------------
|
||||
class=org.apache.activemq.transport.stomp.XStreamFrameTranslator
|
|
@ -0,0 +1,49 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.stomp;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class SamplePojo implements Serializable {
|
||||
private String name;
|
||||
private String city;
|
||||
|
||||
public SamplePojo() {
|
||||
}
|
||||
|
||||
public SamplePojo(String name, String city) {
|
||||
this.name = name;
|
||||
this.city = city;
|
||||
}
|
||||
|
||||
|
||||
public String getCity() {
|
||||
return city;
|
||||
}
|
||||
|
||||
public void setCity(String city) {
|
||||
this.city = city;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
}
|
|
@ -30,19 +30,16 @@ import javax.jms.Connection;
|
|||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.CombinationTestSupport;
|
||||
import org.apache.activemq.broker.BrokerFactory;
|
||||
import org.apache.activemq.broker.BrokerPlugin;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.TransportConnector;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||
import org.apache.activemq.security.AuthorizationPlugin;
|
||||
import org.apache.activemq.security.SimpleSecurityBrokerSystemTest;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
@ -57,6 +54,15 @@ public class StompTest extends CombinationTestSupport {
|
|||
private Connection connection;
|
||||
private Session session;
|
||||
private ActiveMQQueue queue;
|
||||
private String xmlText = "<org.apache.activemq.transport.stomp.SamplePojo>\n"
|
||||
+ " <name>Dejan</name>\n"
|
||||
+ " <city>Belgrade</city>\n"
|
||||
+ "</org.apache.activemq.transport.stomp.SamplePojo>";
|
||||
|
||||
private String jsonText = "{\"org.apache.activemq.transport.stomp.SamplePojo\":{"
|
||||
+ "\"name\":\"Dejan\","
|
||||
+ "\"city\":\"Belgrade\""
|
||||
+ "}}";
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
broker = BrokerFactory.createBroker(new URI(confUri));
|
||||
|
@ -557,6 +563,172 @@ public class StompTest extends CombinationTestSupport {
|
|||
|
||||
}
|
||||
|
||||
public void testTransformationUnknownTranslator() throws Exception {
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
||||
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
frame = stompConnection.receiveFrame();
|
||||
assertTrue(frame.startsWith("CONNECTED"));
|
||||
|
||||
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:test" + "\n\n" + "Hello World" + Stomp.NULL;
|
||||
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
TextMessage message = (TextMessage)consumer.receive(1000);
|
||||
assertNotNull(message);
|
||||
assertEquals("Hello World", message.getText());
|
||||
}
|
||||
|
||||
public void testTransformationFailed() throws Exception {
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
||||
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
frame = stompConnection.receiveFrame();
|
||||
assertTrue(frame.startsWith("CONNECTED"));
|
||||
|
||||
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:jms-xml" + "\n\n" + "Hello World" + Stomp.NULL;
|
||||
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
TextMessage message = (TextMessage)consumer.receive(1000);
|
||||
assertNotNull(message);
|
||||
assertEquals("Hello World", message.getText());
|
||||
}
|
||||
|
||||
public void testTransformationSendXML() throws Exception {
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
||||
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
frame = stompConnection.receiveFrame();
|
||||
assertTrue(frame.startsWith("CONNECTED"));
|
||||
|
||||
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:jms-xml" + "\n\n" + xmlText + Stomp.NULL;
|
||||
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
ObjectMessage message = (ObjectMessage)consumer.receive(1000);
|
||||
assertNotNull(message);
|
||||
SamplePojo object = (SamplePojo)message.getObject();
|
||||
assertEquals("Dejan", object.getName());
|
||||
}
|
||||
|
||||
public void testTransformationSendJSON() throws Exception {
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
||||
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
frame = stompConnection.receiveFrame();
|
||||
assertTrue(frame.startsWith("CONNECTED"));
|
||||
|
||||
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:jms-json" + "\n\n" + jsonText + Stomp.NULL;
|
||||
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
ObjectMessage message = (ObjectMessage)consumer.receive(1000);
|
||||
assertNotNull(message);
|
||||
SamplePojo object = (SamplePojo)message.getObject();
|
||||
assertEquals("Dejan", object.getName());
|
||||
}
|
||||
|
||||
public void testTransformationSubscribeXML() throws Exception {
|
||||
|
||||
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
|
||||
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
|
||||
producer.send(message);
|
||||
|
||||
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
frame = stompConnection.receiveFrame();
|
||||
assertTrue(frame.startsWith("CONNECTED"));
|
||||
|
||||
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:jms-xml" + "\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
frame = stompConnection.receiveFrame();
|
||||
|
||||
assertTrue(frame.trim().endsWith(xmlText));
|
||||
|
||||
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
}
|
||||
|
||||
public void testTransformationReceiveJSON() throws Exception {
|
||||
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
|
||||
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
|
||||
producer.send(message);
|
||||
|
||||
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
frame = stompConnection.receiveFrame();
|
||||
assertTrue(frame.startsWith("CONNECTED"));
|
||||
|
||||
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:jms-json" + "\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
frame = stompConnection.receiveFrame();
|
||||
|
||||
assertTrue(frame.trim().endsWith(jsonText));
|
||||
|
||||
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
}
|
||||
|
||||
public void testTransformationReceiveXML() throws Exception {
|
||||
|
||||
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
|
||||
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
|
||||
message.setStringProperty("transformation", "jms-xml");
|
||||
producer.send(message);
|
||||
|
||||
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
frame = stompConnection.receiveFrame();
|
||||
assertTrue(frame.startsWith("CONNECTED"));
|
||||
|
||||
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
frame = stompConnection.receiveFrame();
|
||||
|
||||
assertTrue(frame.trim().endsWith(xmlText));
|
||||
|
||||
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
}
|
||||
|
||||
public void testTransformationNotOverrideSubscription() throws Exception {
|
||||
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
|
||||
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
|
||||
message.setStringProperty("transformation", "jms-xml");
|
||||
producer.send(message);
|
||||
|
||||
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
frame = stompConnection.receiveFrame();
|
||||
assertTrue(frame.startsWith("CONNECTED"));
|
||||
|
||||
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:jms-json" + "\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
frame = stompConnection.receiveFrame();
|
||||
|
||||
assertTrue(frame.trim().endsWith(jsonText));
|
||||
|
||||
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
}
|
||||
|
||||
protected void assertClients(int expected) throws Exception {
|
||||
org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
|
||||
int actual = clients.length;
|
||||
|
|
Loading…
Reference in New Issue