git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@958103 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2010-06-25 20:23:56 +00:00
parent ddb2c91c20
commit 9807ee3253
10 changed files with 672 additions and 21 deletions

View File

@ -25,24 +25,29 @@ import java.util.Map;
import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.DataStructure;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import com.sun.tools.javac.util.Log;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.HierarchicalStreamReader;
import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver;
import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
import com.thoughtworks.xstream.io.xml.XppReader;
/**
* Frame translator implementation that uses XStream to convert messages to and
* from XML and JSON
*
*
* @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
*/
public class JmsFrameTranslator extends LegacyFrameTranslator implements
@ -102,13 +107,20 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
converter, message, command, this);
ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString());
} else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString());
}
ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
command.setContent(marshall(msg.getObject(),
headers.get(Stomp.Headers.TRANSFORMATION))
.getBytes("UTF-8"));
return command;
} else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
} else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
StompFrame command = new StompFrame();
command.setAction(Stomp.Responses.MESSAGE);
Map<String, String> headers = new HashMap<String, String>(25);
@ -116,11 +128,39 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
converter, message, command, this);
if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString());
} else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString());
}
ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
command.setContent(marshall((Serializable)msg.getContentMap(),
headers.get(Stomp.Headers.TRANSFORMATION))
.getBytes("UTF-8"));
return command;
return command;
} else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
StompFrame command = new StompFrame();
command.setAction(Stomp.Responses.MESSAGE);
Map<String, String> headers = new HashMap<String, String>(25);
command.setHeaders(headers);
FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
converter, message, command, this);
if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString());
} else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString());
}
String body = marshallAdvisory(message.getDataStructure(),
headers.get(Stomp.Headers.TRANSFORMATION));
command.setContent(body.getBytes("UTF-8"));
return command;
} else {
return super.convertMessage(converter, message);
}
@ -148,7 +188,7 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
objMsg.setObject((Serializable) obj);
return objMsg;
}
protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
@ -157,8 +197,23 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
}
return mapMsg;
}
protected String marshallAdvisory(final DataStructure ds, String transformation) {
StringWriter buffer = new StringWriter();
HierarchicalStreamWriter out;
if (transformation.toLowerCase().endsWith("json")) {
out = new JettisonMappedXmlDriver().createWriter(buffer);
} else {
out = new PrettyPrintWriter(buffer);
}
XStream xstream = getXStream();
xstream.setMode(XStream.NO_REFERENCES);
xstream.aliasPackage("", "org.apache.activemq.command");
xstream.marshal(ds, out);
return buffer.toString();
}
// Properties
// -------------------------------------------------------------------------

View File

@ -23,17 +23,17 @@ import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.*;
/**
* Implements ActiveMQ 4.0 translations
*/
public class LegacyFrameTranslator implements FrameTranslator {
public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
final Map headers = command.getHeaders();
final ActiveMQMessage msg;
@ -77,6 +77,14 @@ public class LegacyFrameTranslator implements FrameTranslator {
headers.put(Stomp.Headers.CONTENT_LENGTH, "" + data.length);
command.setContent(data);
} else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
converter, message, command, this);
String body = marshallAdvisory(message.getDataStructure());
command.setContent(body.getBytes("UTF-8"));
}
return command;
}
@ -92,7 +100,7 @@ public class LegacyFrameTranslator implements FrameTranslator {
if( rc!=null ) {
return rc;
}
StringBuffer buffer = new StringBuffer();
if (activeMQDestination.isQueue()) {
if (activeMQDestination.isTemporary()) {
@ -135,4 +143,16 @@ public class LegacyFrameTranslator implements FrameTranslator {
+ "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
}
}
/**
* Return an Advisory message as a JSON formatted string
* @param ds
* @return
*/
protected String marshallAdvisory(final DataStructure ds) {
XStream xstream = new XStream(new JsonHierarchicalStreamDriver());
xstream.setMode(XStream.NO_REFERENCES);
xstream.aliasPackage("", "org.apache.activemq.command");
return xstream.toXML(ds);
}
}

View File

@ -117,16 +117,24 @@ public interface Stomp {
String MESSAGE_ID = "message-id";
}
}
public enum Transformations {
JMS_BYTE, JMS_OBJECT_XML, JMS_OBJECT_JSON, JMS_MAP_XML, JMS_MAP_JSON;
JMS_BYTE,
JMS_XML,
JMS_JSON,
JMS_OBJECT_XML,
JMS_OBJECT_JSON,
JMS_MAP_XML,
JMS_MAP_JSON,
JMS_ADVISORY_XML,
JMS_ADVISORY_JSON;
public String toString() {
return name().replaceAll("_", "-").toLowerCase();
}
public static Transformations getValue(String value) {
return valueOf(value.replaceAll("-", "_").toUpperCase());
}
}
}
}

View File

@ -25,6 +25,7 @@ import java.util.Map.Entry;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerInfo;
@ -80,7 +81,7 @@ public class StompSubscription {
boolean ignoreTransformation = false;
if (transformation != null) {
if (transformation != null && !( message instanceof ActiveMQBytesMessage ) ) {
message.setReadOnlyProperties(false);
message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
} else {

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.stomp.JmsFrameTranslator

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.stomp.JmsFrameTranslator

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.stomp.JmsFrameTranslator

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.stomp.JmsFrameTranslator

View File

@ -0,0 +1,340 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.*;
import java.io.File;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @version $Revision: 1461 $
*/
public class StompAdvisoryTest extends TestCase {
private static final Log LOG = LogFactory.getLog(StompAdvisoryTest.class);
protected ConnectionFactory factory;
protected ActiveMQConnection connection;
protected BrokerService broker;
StompConnection stompConnection;
URI tcpBrokerUri;
URI stompBrokerUri;
private PolicyEntry createPolicyEntry() {
PolicyEntry policy = new PolicyEntry();
policy.setAdvisdoryForFastProducers(true);
policy.setAdvisoryForConsumed(true);
policy.setAdvisoryForDelivery(true);
policy.setAdvisoryForDiscardingMessages(true);
policy.setAdvisoryForSlowConsumers(true);
policy.setAdvisoryWhenFull(true);
policy.setProducerFlowControl(false);
ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy();
strategy.setLimit(10);
policy.setPendingMessageLimitStrategy(strategy);
return policy;
}
protected BrokerService createBroker() throws Exception {
BrokerService broker = BrokerFactory.createBroker(new URI("broker://()/localhost?useJmx=false"));
broker.setPersistent(false);
PolicyEntry policy = new PolicyEntry();
policy.setAdvisdoryForFastProducers(true);
policy.setAdvisoryForConsumed(true);
policy.setAdvisoryForDelivery(true);
policy.setAdvisoryForDiscardingMessages(true);
policy.setAdvisoryForSlowConsumers(true);
policy.setAdvisoryWhenFull(true);
policy.setProducerFlowControl(false);
ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy();
strategy.setLimit(10);
policy.setPendingMessageLimitStrategy(strategy);
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
broker.setDestinationPolicy(pMap);
broker.setDeleteAllMessagesOnStartup(true);
broker.addConnector("tcp://localhost:0");
broker.addConnector("stomp://localhost:0");
return broker;
}
protected void setUp() throws Exception {
super.setUp();
if (System.getProperty("basedir") == null) {
File file = new File(".");
System.setProperty("basedir", file.getAbsolutePath());
}
broker = createBroker();
broker.start();
tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri());
stompBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(1).getConnectUri());
LOG.info("Producing using TCP uri: " + tcpBrokerUri);
LOG.info("consuming using STOMP uri: " + stompBrokerUri);
stompConnection = new StompConnection();
stompConnection.open(new Socket("localhost", stompBrokerUri.getPort()));
}
protected void tearDown() throws Exception {
stompConnection.disconnect();
stompConnection.close();
}
public void testConnectionAdvisory() throws Exception {
Destination dest = new ActiveMQQueue("testConnectionAdvisory");
stompConnection.connect("system", "manager");
stompConnection.subscribe("/topic/ActiveMQ.Advisory.Connection", Stomp.Headers.Subscribe.AckModeValues.AUTO);
// Now connect via openwire and check we get the advisory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
Connection c = factory.createConnection();
c.start();
StompFrame f = stompConnection.receive();
LOG.debug(f);
assertEquals(f.getAction(),"MESSAGE");
assertTrue("Should have a body", f.getBody().length() > 0);
assertTrue(f.getBody().startsWith("{\"ConnectionInfo\":"));
Map<String,String> headers = f.getHeaders();
c.stop();
c.close();
f = stompConnection.receive();
LOG.debug(f);
assertEquals(f.getAction(),"MESSAGE");
assertNotNull("Body is not null", f.getBody());
assertTrue("Body should have content", f.getBody().length() > 0);
assertTrue(f.getBody().startsWith("{\"ConnectionInfo\":"));
}
public void testConnectionAdvisoryJSON() throws Exception {
Destination dest = new ActiveMQQueue("testConnectionAdvisory");
HashMap<String, String> subheaders = new HashMap<String, String>(1);
subheaders.put("transformation", Stomp.Transformations.JMS_JSON.toString());
stompConnection.connect("system", "manager");
stompConnection.subscribe("/topic/ActiveMQ.Advisory.Connection",
Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders);
// Now connect via openwire and check we get the advisory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
Connection c = factory.createConnection();
c.start();
StompFrame f = stompConnection.receive();
LOG.debug(f);
assertEquals(f.getAction(),"MESSAGE");
assertTrue("Should have a body", f.getBody().length() > 0);
assertTrue(f.getBody().startsWith("{\"ConnectionInfo\":"));
Map<String,String> headers = f.getHeaders();
c.stop();
c.close();
f = stompConnection.receive();
LOG.debug(f);
assertEquals(f.getAction(),"MESSAGE");
assertNotNull("Body is not null", f.getBody());
assertTrue("Body should have content", f.getBody().length() > 0);
assertTrue(f.getBody().startsWith("{\"ConnectionInfo\":"));
}
public void testConnectionAdvisoryXML() throws Exception {
Destination dest = new ActiveMQQueue("testConnectionAdvisory");
HashMap<String, String> subheaders = new HashMap<String, String>(1);
subheaders.put("transformation", Stomp.Transformations.JMS_XML.toString());
stompConnection.connect("system", "manager");
stompConnection.subscribe("/topic/ActiveMQ.Advisory.Connection",
Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders);
// Now connect via openwire and check we get the advisory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
Connection c = factory.createConnection();
c.start();
StompFrame f = stompConnection.receive();
LOG.debug(f);
assertEquals(f.getAction(),"MESSAGE");
assertTrue("Should have a body", f.getBody().length() > 0);
assertTrue(f.getBody().startsWith("<ConnectionInfo>"));
Map<String,String> headers = f.getHeaders();
c.stop();
c.close();
f = stompConnection.receive();
LOG.debug(f);
assertEquals(f.getAction(),"MESSAGE");
assertNotNull("Body is not null", f.getBody());
assertTrue("Body should have content", f.getBody().length() > 0);
assertTrue(f.getBody().startsWith("<ConnectionInfo>"));
}
public void testConsumerAdvisory() throws Exception {
Destination dest = new ActiveMQQueue("testConsumerAdvisory");
stompConnection.connect("system", "manager");
stompConnection.subscribe("/topic/ActiveMQ.Advisory.Consumer.>", Stomp.Headers.Subscribe.AckModeValues.AUTO);
// Now connect via openwire and check we get the advisory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
Connection c = factory.createConnection();
c.start();
Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(dest);
StompFrame f = stompConnection.receive();
LOG.debug(f);
assertEquals(f.getAction(),"MESSAGE");
assertTrue("Should have a body", f.getBody().length() > 0);
assertTrue(f.getBody().startsWith("{\"ConsumerInfo\":"));
c.stop();
c.close();
}
public void testProducerAdvisory() throws Exception {
Destination dest = new ActiveMQQueue("testProducerAdvisory");
stompConnection.connect("system", "manager");
stompConnection.subscribe("/topic/ActiveMQ.Advisory.Producer.>", Stomp.Headers.Subscribe.AckModeValues.AUTO);
// Now connect via openwire and check we get the advisory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
Connection c = factory.createConnection();
c.start();
Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(dest);
Message mess = session.createTextMessage("test");
producer.send(mess);
StompFrame f = stompConnection.receive();
LOG.debug(f);
assertEquals(f.getAction(),"MESSAGE");
assertTrue("Should have a body", f.getBody().length() > 0);
assertTrue(f.getBody().startsWith("{\"ProducerInfo\":"));
c.stop();
c.close();
}
public void testProducerAdvisoryXML() throws Exception {
Destination dest = new ActiveMQQueue("testProducerAdvisoryXML");
HashMap<String, String> subheaders = new HashMap<String, String>(1);
subheaders.put("transformation", Stomp.Transformations.JMS_ADVISORY_XML.toString());
stompConnection.connect("system", "manager");
stompConnection.subscribe("/topic/ActiveMQ.Advisory.Producer.>",
Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders);
// Now connect via openwire and check we get the advisory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
Connection c = factory.createConnection();
c.start();
Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(dest);
Message mess = session.createTextMessage("test");
producer.send(mess);
StompFrame f = stompConnection.receive();
LOG.debug(f);
assertEquals(f.getAction(),"MESSAGE");
assertTrue("Should have a body", f.getBody().length() > 0);
assertTrue(f.getBody().startsWith("<ProducerInfo>"));
c.stop();
c.close();
}
public void testProducerAdvisoryJSON() throws Exception {
Destination dest = new ActiveMQQueue("testProducerAdvisoryJSON");
HashMap<String, String> subheaders = new HashMap<String, String>(1);
subheaders.put("transformation", Stomp.Transformations.JMS_ADVISORY_JSON.toString());
stompConnection.connect("system", "manager");
stompConnection.subscribe("/topic/ActiveMQ.Advisory.Producer.>",
Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders);
// Now connect via openwire and check we get the advisory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
Connection c = factory.createConnection();
c.start();
Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(dest);
Message mess = session.createTextMessage("test");
producer.send(mess);
StompFrame f = stompConnection.receive();
LOG.debug(f);
assertEquals(f.getAction(),"MESSAGE");
assertTrue("Should have a body", f.getBody().length() > 0);
assertTrue(f.getBody().startsWith("{\"ProducerInfo\":"));
c.stop();
c.close();
}
}

View File

@ -245,7 +245,7 @@ public class StompTest extends CombinationTestSupport {
assertEquals("foo", "abc", message.getStringProperty("foo"));
assertEquals("bar", "123", message.getStringProperty("bar"));
}
public void testSendMessageWithDelay() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -823,6 +823,165 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
public void testTransformationReceiveObject() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
producer.send(message);
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(xmlObject));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
public void testTransformationReceiveXMLObjectAndMap() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage objMessage = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
producer.send(objMessage);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name", "Dejan");
mapMessage.setString("city", "Belgrade");
producer.send(mapMessage);
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_XML + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(xmlObject));
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(xmlMap.trim()));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
public void testTransformationReceiveJSONObjectAndMap() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage objMessage = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
producer.send(objMessage);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name", "Dejan");
mapMessage.setString("city", "Belgrade");
producer.send(mapMessage);
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_JSON + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(jsonObject));
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(jsonMap.trim()));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
public void testTransformationSendAndReceiveXmlMap() throws Exception {
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_XML + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_MAP_JSON + "\n\n" + jsonMap + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertNotNull(frame);
assertTrue(frame.trim().endsWith(xmlMap.trim()));
assertTrue(frame.contains("jms-map-xml"));
}
public void testTransformationSendAndReceiveJsonMap() throws Exception {
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_JSON + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_MAP_XML + "\n\n" + xmlMap + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertNotNull(frame);
assertTrue(frame.trim().endsWith(jsonMap.trim()));
assertTrue(frame.contains("jms-map-json"));
}
public void testTransformationReceiveBytesMessage() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
BytesMessage message = session.createBytesMessage();
message.writeBytes(new byte[]{1, 2, 3, 4, 5});
producer.send(message);
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_XML + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
Matcher clMmatcher = cl.matcher(frame);
assertTrue(clMmatcher.find());
assertEquals("5", clMmatcher.group(1));
assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find());
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
public void testTransformationNotOverrideSubscription() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));