git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1411899 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-11-20 21:45:43 +00:00
parent badc1868bb
commit 72dfbfa283
2 changed files with 133 additions and 26 deletions

View File

@ -34,8 +34,10 @@ import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage; import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.DataStructure; import org.apache.activemq.command.DataStructure;
import org.codehaus.jettison.mapped.Configuration; import org.codehaus.jettison.mapped.Configuration;
import org.fusesource.hawtbuf.UTF8Buffer;
import com.thoughtworks.xstream.XStream; import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.converters.basic.AbstractSingleValueConverter;
import com.thoughtworks.xstream.io.HierarchicalStreamReader; import com.thoughtworks.xstream.io.HierarchicalStreamReader;
import com.thoughtworks.xstream.io.HierarchicalStreamWriter; import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver; import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
@ -55,11 +57,12 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
XStream xStream = null; XStream xStream = null;
BrokerContext brokerContext; BrokerContext brokerContext;
@Override
public ActiveMQMessage convertFrame(ProtocolConverter converter, public ActiveMQMessage convertFrame(ProtocolConverter converter,
StompFrame command) throws JMSException, ProtocolException { StompFrame command) throws JMSException, ProtocolException {
Map<String, String> headers = command.getHeaders(); Map<String, String> headers = command.getHeaders();
ActiveMQMessage msg; ActiveMQMessage msg;
String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION); String transformation = headers.get(Stomp.Headers.TRANSFORMATION);
if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) { if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
msg = super.convertFrame(converter, command); msg = super.convertFrame(converter, command);
} else { } else {
@ -96,6 +99,7 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
return msg; return msg;
} }
@Override
public StompFrame convertMessage(ProtocolConverter converter, public StompFrame convertMessage(ProtocolConverter converter,
ActiveMQMessage message) throws IOException, JMSException { ActiveMQMessage message) throws IOException, JMSException {
if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) { if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
@ -136,8 +140,7 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy(); ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
command.setContent(marshall((Serializable)msg.getContentMap(), command.setContent(marshall((Serializable)msg.getContentMap(),
headers.get(Stomp.Headers.TRANSFORMATION)) headers.get(Stomp.Headers.TRANSFORMATION)).getBytes("UTF-8"));
.getBytes("UTF-8"));
return command; return command;
} else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE && } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) { AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
@ -168,8 +171,7 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
/** /**
* Marshalls the Object to a string using XML or JSON encoding * Marshalls the Object to a string using XML or JSON encoding
*/ */
protected String marshall(Serializable object, String transformation) protected String marshall(Serializable object, String transformation) throws JMSException {
throws JMSException {
StringWriter buffer = new StringWriter(); StringWriter buffer = new StringWriter();
HierarchicalStreamWriter out; HierarchicalStreamWriter out;
if (transformation.toLowerCase(Locale.ENGLISH).endsWith("json")) { if (transformation.toLowerCase(Locale.ENGLISH).endsWith("json")) {
@ -246,12 +248,30 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
if (xstream == null) { if (xstream == null) {
xstream = new XStream(); xstream = new XStream();
} }
return xstream;
// For any object whose elements contains an UTF8Buffer instance instead of a String
// type we map it to String both in and out such that we don't marshal UTF8Buffers out
xstream.registerConverter(new AbstractSingleValueConverter() {
@Override
public Object fromString(String str) {
return str;
}
@SuppressWarnings("rawtypes")
@Override
public boolean canConvert(Class type) {
return type.equals(UTF8Buffer.class);
}
});
xstream.alias("string", UTF8Buffer.class);
return xstream;
} }
@Override
public void setBrokerContext(BrokerContext brokerContext) { public void setBrokerContext(BrokerContext brokerContext) {
this.brokerContext = brokerContext; this.brokerContext = brokerContext;
} }
} }

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.StringReader;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
@ -55,12 +56,20 @@ import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.HierarchicalStreamReader;
import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
import com.thoughtworks.xstream.io.xml.XppReader;
import com.thoughtworks.xstream.io.xml.xppdom.XppFactory;
public class StompTest extends StompTestSupport { public class StompTest extends StompTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(StompTest.class); private static final Logger LOG = LoggerFactory.getLogger(StompTest.class);
protected Connection connection; protected Connection connection;
protected Session session; protected Session session;
protected ActiveMQQueue queue; protected ActiveMQQueue queue;
protected XStream xstream;
private final String xmlObject = "<pojo>\n" private final String xmlObject = "<pojo>\n"
+ " <name>Dejan</name>\n" + " <name>Dejan</name>\n"
+ " <city>Belgrade</city>\n" + " <city>Belgrade</city>\n"
@ -120,6 +129,8 @@ public class StompTest extends StompTestSupport {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = new ActiveMQQueue(getQueueName()); queue = new ActiveMQQueue(getQueueName());
connection.start(); connection.start();
xstream = new XStream();
xstream.processAnnotations(SamplePojo.class);
} }
@Override @Override
@ -134,7 +145,7 @@ public class StompTest extends StompTestSupport {
} }
@Override @Override
protected void addStompConnector() throws Exception { protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp://0.0.0.0:"+port); TransportConnector connector = brokerService.addConnector("stomp://0.0.0.0:"+port);
port = connector.getConnectUri().getPort(); port = connector.getConnectUri().getPort();
} }
@ -1173,9 +1184,15 @@ public class StompTest extends StompTestSupport {
assertTrue(frame.trim().endsWith(xmlObject)); assertTrue(frame.trim().endsWith(xmlObject));
frame = stompConnection.receiveFrame(); StompFrame xmlFrame = stompConnection.receive();
assertTrue(frame.trim().endsWith(xmlMap.trim())); Map<String, String> map = createMapFromXml(xmlFrame.getBody());
assertTrue(map.containsKey("name"));
assertTrue(map.containsKey("city"));
assertTrue(map.get("name").equals("Dejan"));
assertTrue(map.get("city").equals("Belgrade"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
@ -1201,13 +1218,23 @@ public class StompTest extends StompTestSupport {
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_JSON + "\n\n" + Stomp.NULL; frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_JSON + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame(); StompFrame json = stompConnection.receive();
LOG.info("Transformed frame: {}", json);
assertTrue(frame.trim().endsWith(jsonObject)); SamplePojo pojo = createObjectFromJson(json.getBody());
assertTrue(pojo.getCity().equals("Belgrade"));
assertTrue(pojo.getName().equals("Dejan"));
frame = stompConnection.receiveFrame(); json = stompConnection.receive();
LOG.info("Transformed frame: {}", json);
assertTrue(frame.trim().endsWith(jsonMap.trim())); Map<String, String> map = createMapFromJson(json.getBody());
assertTrue(map.containsKey("name"));
assertTrue(map.containsKey("city"));
assertTrue(map.get("name").equals("Dejan"));
assertTrue(map.get("city").equals("Belgrade"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
@ -1229,11 +1256,18 @@ public class StompTest extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame(); StompFrame xmlFrame = stompConnection.receive();
LOG.info("Received Frame: {}", xmlFrame.getBody());
assertNotNull(frame); Map<String, String> map = createMapFromXml(xmlFrame.getBody());
assertTrue(frame.trim().endsWith(xmlMap.trim()));
assertTrue(frame.contains("jms-map-xml")); assertTrue(map.containsKey("name"));
assertTrue(map.containsKey("city"));
assertEquals("Dejan", map.get("name"));
assertEquals("Belgrade", map.get("city"));
assertTrue(xmlFrame.getHeaders().containsValue("jms-map-xml"));
} }
@Test @Test
@ -1252,11 +1286,19 @@ public class StompTest extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame(); StompFrame json = stompConnection.receive();
LOG.info("Received Frame: {}", json.getBody());
assertNotNull(frame); assertNotNull(json);
assertTrue(frame.trim().endsWith(jsonMap.trim())); assertTrue(json.getHeaders().containsValue("jms-map-json"));
assertTrue(frame.contains("jms-map-json"));
Map<String, String> map = createMapFromJson(json.getBody());
assertTrue(map.containsKey("name"));
assertTrue(map.containsKey("city"));
assertEquals("Dejan", map.get("name"));
assertEquals("Belgrade", map.get("city"));
} }
@Test @Test
@ -1394,9 +1436,16 @@ public class StompTest extends StompTestSupport {
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n" + "transformation:" + Stomp.Transformations.JMS_MAP_XML + "\n\n" + Stomp.NULL; frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n" + "transformation:" + Stomp.Transformations.JMS_MAP_XML + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame(); StompFrame xmlFrame = stompConnection.receive();
LOG.info("Received Frame: {}", xmlFrame.getBody());
assertTrue(frame.trim().endsWith(xmlMap.trim())); Map<String, String> map = createMapFromXml(xmlFrame.getBody());
assertTrue(map.containsKey("name"));
assertTrue(map.containsKey("city"));
assertEquals("Dejan", map.get("name"));
assertEquals("Belgrade", map.get("city"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
@ -1420,9 +1469,16 @@ public class StompTest extends StompTestSupport {
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n" + "transformation:" + Stomp.Transformations.JMS_MAP_JSON + "\n\n" + Stomp.NULL; frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n" + "transformation:" + Stomp.Transformations.JMS_MAP_JSON + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame(); StompFrame json = stompConnection.receive();
LOG.info("Received Frame: {}", json.getBody());
assertNotNull(json);
Map<String, String> map = createMapFromJson(json.getBody());
assertTrue(frame.trim().endsWith(jsonMap.trim())); assertTrue(map.containsKey("name"));
assertTrue(map.containsKey("city"));
assertEquals("Dejan", map.get("name"));
assertEquals("Belgrade", map.get("city"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
@ -2262,4 +2318,35 @@ public class StompTest extends StompTestSupport {
assertEquals("MESSAGE", sframe.getAction()); assertEquals("MESSAGE", sframe.getAction());
assertEquals(bigBody, sframe.getBody()); assertEquals(bigBody, sframe.getBody());
} }
protected SamplePojo createObjectFromJson(String data) throws Exception {
HierarchicalStreamReader in = new JettisonMappedXmlDriver().createReader(new StringReader(data));
return createObject(in);
}
protected SamplePojo createObjectFromXml(String data) throws Exception {
HierarchicalStreamReader in = new XppReader(new StringReader(data), XppFactory.createDefaultParser());
return createObject(in);
}
private SamplePojo createObject(HierarchicalStreamReader in) throws Exception {
SamplePojo pojo = (SamplePojo) xstream.unmarshal(in);
return pojo;
}
protected Map<String, String> createMapFromJson(String data) throws Exception {
HierarchicalStreamReader in = new JettisonMappedXmlDriver().createReader(new StringReader(data));
return createMapObject(in);
}
protected Map<String, String> createMapFromXml(String data) throws Exception {
HierarchicalStreamReader in = new XppReader(new StringReader(data), XppFactory.createDefaultParser());
return createMapObject(in);
}
@SuppressWarnings("unchecked")
private Map<String, String> createMapObject(HierarchicalStreamReader in) throws Exception {
Map<String, String> map = (Map<String, String>)xstream.unmarshal(in);
return map;
}
} }