added support for arbitrarily nested Map or List properties on Message together with nesting on MapMessage to fix AMQ-757. For more information see: http://activemq.org/site/structured-message-properties-and-mapmessages.html

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@415642 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-06-20 13:58:34 +00:00
parent b695b490b2
commit ca067a6bbb
14 changed files with 296 additions and 35 deletions

View File

@ -126,6 +126,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
protected boolean alwaysSessionAsync=true;
private boolean useAsyncSend = false;
private boolean optimizeAcknowledge = false;
private boolean nestedMapAndListEnabled = true;
private boolean useRetroactiveConsumer;
private int closeTimeout = 15000;
@ -826,6 +827,18 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.useRetroactiveConsumer = useRetroactiveConsumer;
}
public boolean isNestedMapAndListEnabled() {
return nestedMapAndListEnabled;
}
/**
* Enables/disables whether or not Message properties and MapMessage entries support nested Map and List objects
*/
public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
this.nestedMapAndListEnabled = structuredMapsEnabled;
}
/**
* Adds a transport listener so that a client can be notified of events in the underlying
* transport

View File

@ -81,7 +81,7 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage {
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_MAP_MESSAGE;
transient protected HashMap map = new HashMap();
transient protected Map map = new HashMap();
public Message copy() {
ActiveMQMapMessage copy = new ActiveMQMapMessage();
@ -624,13 +624,11 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage {
public void setObject(String name, Object value) throws JMSException {
initializeWriting();
if (value != null) {
if (value instanceof Number || value instanceof String || value instanceof Boolean ||
value instanceof Byte
|| value instanceof Character || value instanceof byte[]) {
put(name, value);
} else {
throw new MessageFormatException(value.getClass() + " is not a primitive type");
// byte[] not allowed on properties
if (!(value instanceof byte[])) {
checkValidObject(value);
}
put(name, value);
} else {
put(name, null);
}

View File

@ -21,6 +21,7 @@ import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Vector;
@ -30,6 +31,7 @@ import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.jms.MessageNotWriteableException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.filter.PropertyExpression;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.util.Callback;
@ -405,11 +407,22 @@ public class ActiveMQMessage extends Message implements javax.jms.Message {
}
private void checkValidObject(Object value) throws MessageFormatException {
if(!(value instanceof Boolean || value instanceof Byte || value instanceof Short || value instanceof Integer ||
value instanceof Long || value instanceof Float || value instanceof Double || value instanceof String ||
value == null)) {
throw new MessageFormatException("Only objectified primitive objects and String types are allowed");
protected void checkValidObject(Object value) throws MessageFormatException {
if (!(value instanceof Boolean || value instanceof Byte || value instanceof Short || value instanceof Integer || value instanceof Long
|| value instanceof Float || value instanceof Double || value instanceof Character || value instanceof String || value == null)) {
ActiveMQConnection conn = getConnection();
// conn is null if we are in the broker rather than a JMS client
if (conn == null || conn.isNestedMapAndListEnabled()) {
if (!(value instanceof Map || value instanceof List)) {
throw new MessageFormatException("Only objectified primitive objects, String, Map and List types are allowed but was: " + value + " type: "
+ value.getClass());
}
}
else {
throw new MessageFormatException("Only objectified primitive objects and String types are allowed but was: " + value + " type: "
+ value.getClass());
}
}
}

View File

@ -70,7 +70,7 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess
protected int redeliveryCounter;
protected int size;
protected HashMap properties;
protected Map properties;
protected boolean readOnlyProperties = false;
protected boolean readOnlyBody = false;
protected transient boolean recievedByDFBridge = false;
@ -160,7 +160,7 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess
}
}
private HashMap unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
private Map unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)));
}

View File

@ -46,7 +46,7 @@ public class WireFormatInfo implements Command, MarshallAware {
protected int version;
protected ByteSequence marshalledProperties;
protected transient HashMap properties;
protected transient Map properties;
private transient Endpoint from;
private transient Endpoint to;
@ -159,7 +159,7 @@ public class WireFormatInfo implements Command, MarshallAware {
}
}
private HashMap unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
private Map unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)), MAX_PROPERTY_SIZE);
}

View File

@ -22,8 +22,11 @@ import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UTFDataFormatException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
*
@ -46,8 +49,10 @@ public class MarshallingSupport {
public static final byte FLOAT_TYPE = 8;
public static final byte STRING_TYPE = 9;
public static final byte BYTE_ARRAY_TYPE = 10;
public static final byte MAP_TYPE = 11;
public static final byte LIST_TYPE = 12;
static public void marshalPrimitiveMap(HashMap map, DataOutputStream out) throws IOException {
static public void marshalPrimitiveMap(Map map, DataOutputStream out) throws IOException {
if( map == null ) {
out.writeInt(-1);
} else {
@ -61,7 +66,7 @@ public class MarshallingSupport {
}
}
static public HashMap unmarshalPrimitiveMap(DataInputStream in) throws IOException {
static public Map unmarshalPrimitiveMap(DataInputStream in) throws IOException {
return unmarshalPrimitiveMap(in, Integer.MAX_VALUE);
}
@ -71,7 +76,7 @@ public class MarshallingSupport {
* @throws IOException
* @throws IOException
*/
public static HashMap unmarshalPrimitiveMap(DataInputStream in, int max_property_size) throws IOException {
public static Map unmarshalPrimitiveMap(DataInputStream in, int max_property_size) throws IOException {
int size = in.readInt();
if( size > max_property_size ) {
throw new IOException("Primitive map is larger than the allowed size: "+size);
@ -89,6 +94,23 @@ public class MarshallingSupport {
}
public static void marshalPrimitiveList(List list, DataOutputStream out) throws IOException {
out.writeInt(list.size());
for (Iterator iter = list.iterator(); iter.hasNext();) {
Object element = (Object) iter.next();
marshalPrimitive(out, element);
}
}
public static List unmarshalPrimitiveList(DataInputStream in) throws IOException {
int size = in.readInt();
List answer = new ArrayList(size);
while (size-- > 0) {
answer.add(unmarshalPrimitive(in));
}
return answer;
}
static public void marshalPrimitive(DataOutputStream out, Object value) throws IOException {
if( value == null ) {
out.writeByte(NULL);
@ -123,11 +145,18 @@ public class MarshallingSupport {
} else if( value.getClass() == String.class ) {
out.writeByte(STRING_TYPE);
out.writeUTF((String)value);
} else if( value instanceof Map) {
out.writeByte(MAP_TYPE);
marshalPrimitiveMap((Map) value, out);
} else if( value instanceof List) {
out.writeByte(LIST_TYPE);
marshalPrimitiveList((List) value, out);
} else {
throw new IOException("Object is not a primitive: "+value);
}
}
static public Object unmarshalPrimitive(DataInputStream in) throws IOException {
Object value=null;
switch( in.readByte() ) {
@ -162,6 +191,12 @@ public class MarshallingSupport {
case STRING_TYPE:
value = in.readUTF();
break;
case MAP_TYPE:
value = unmarshalPrimitiveMap(in);
break;
case LIST_TYPE:
value = unmarshalPrimitiveList(in);
break;
}
return value;
}

View File

@ -28,7 +28,7 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision: 1.0 $
*/
public class AutoFailTestSupport extends TestCase {
public abstract class AutoFailTestSupport extends TestCase {
protected static final Log log = LogFactory.getLog(AutoFailTestSupport.class);
public static final int EXIT_SUCCESS = 0;

View File

@ -65,7 +65,7 @@ import org.apache.commons.logging.LogFactory;
*
* @version $Revision: 1.5 $
*/
public class CombinationTestSupport extends AutoFailTestSupport {
public abstract class CombinationTestSupport extends AutoFailTestSupport {
protected static final Log log = LogFactory.getLog(CombinationTestSupport.class);

View File

@ -241,6 +241,8 @@ public class ActiveMQMapMessageTest extends TestCase {
msg.setObject("short", shortValue);
msg.setObject("string", stringValue);
} catch (MessageFormatException mfe) {
System.out.println("Caught: " + mfe);
mfe.printStackTrace();
fail("object formats should be correct");
}

View File

@ -29,7 +29,7 @@ import org.apache.activeio.packet.Packet;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.openwire.OpenWireFormat;
public class DataStructureTestSupport extends CombinationTestSupport {
public abstract class DataStructureTestSupport extends CombinationTestSupport {
public boolean cacheEnabled;
public WireFormat wireFormat;

View File

@ -39,7 +39,7 @@ import org.apache.commons.logging.LogFactory;
/**
* @version $Revision: 1.2 $
*/
public class JmsSendReceiveTestSupport extends TestSupport implements MessageListener {
public abstract class JmsSendReceiveTestSupport extends TestSupport implements MessageListener {
protected static final Log log = LogFactory.getLog(JmsSendReceiveTestSupport.class);
protected int messageCount = 100;
@ -111,7 +111,7 @@ public class JmsSendReceiveTestSupport extends TestSupport implements MessageLis
messages.clear();
for (int i = 0; i < data.length; i++) {
Message message = session.createTextMessage(data[i]);
Message message = createMessage(i);
configureMessage(message);
if (verbose) {
log.info("About to send a message: " + message + " with text: " + data[i]);
@ -124,6 +124,12 @@ public class JmsSendReceiveTestSupport extends TestSupport implements MessageLis
log.info("" + data.length + " messages(s) received, closing down connections");
}
protected Message createMessage(int index) throws JMSException {
Message message = session.createTextMessage(data[index]);
return message;
}
/**
* A hook to allow the message to be configured such as adding extra headers
* @throws JMSException
@ -155,25 +161,31 @@ public class JmsSendReceiveTestSupport extends TestSupport implements MessageLis
if (data.length != copyOfMessages.size()) {
for (Iterator iter = copyOfMessages.iterator(); iter.hasNext();) {
TextMessage message = (TextMessage) iter.next();
log.info("<== " + counter++ + " = " + message.getText());
Object message = iter.next();
log.info("<== " + counter++ + " = " + message);
}
}
assertEquals("Not enough messages received", data.length, receivedMessages.size());
for (int i = 0; i < data.length; i++) {
TextMessage received = (TextMessage) receivedMessages.get(i);
String text = received.getText();
if (verbose) {
log.info("Received Text: " + text);
}
assertEquals("Message: " + i, data[i], text);
Message received = (Message) receivedMessages.get(i);
assertMessageValid(i, received);
}
}
protected void assertMessageValid(int index, Message message) throws JMSException {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
if (verbose) {
log.info("Received Text: " + text);
}
assertEquals("Message: " + index, data[index], text);
}
/**
* Waits for the messages to be delivered or when the wait time has been reached.
*/

View File

@ -39,7 +39,7 @@ import org.apache.commons.logging.LogFactory;
*
* @version $Revision: 1.4 $
*/
public class TestSupport extends TestCase {
public abstract class TestSupport extends TestCase {
final static protected Log log = LogFactory.getLog(TestSupport.class);
protected ActiveMQConnectionFactory connectionFactory;
protected boolean topic = true;

View File

@ -0,0 +1,92 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.test.message;
import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsAndEmbeddedBrokerTest;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Tests that a Message can have nested Map and List properties attached.
*
* @version $Revision$
*/
public class NestedMapAndListPropertyTest extends JmsTopicSendReceiveWithTwoConnectionsAndEmbeddedBrokerTest {
private static final Log log = LogFactory.getLog(NestedMapAndListPropertyTest.class);
protected void assertMessageValid(int index, Message message) throws JMSException {
Object value = message.getObjectProperty("textField");
assertEquals("textField", data[index], value);
Map map = (Map) message.getObjectProperty("mapField");
assertNotNull(map);
assertEquals("mapField.a", "foo", map.get("a"));
assertEquals("mapField.b", new Integer(23), map.get("b"));
assertEquals("mapField.c", new Long(45), map.get("c"));
value = map.get("d");
assertTrue("mapField.d should be a Map", value instanceof Map);
map = (Map) value;
assertEquals("mapField.d.x", "abc", map.get("x"));
value = map.get("y");
assertTrue("mapField.d.y is a List", value instanceof List);
List list = (List) value;
log.debug("mapField.d.y: " + list);
assertEquals("listField.size", 3, list.size());
log.debug("Found map: " + map);
list = (List) message.getObjectProperty("listField");
log.debug("listField: " + list);
assertEquals("listField.size", 3, list.size());
assertEquals("listField[0]", "a", list.get(0));
assertEquals("listField[1]", "b", list.get(1));
assertEquals("listField[2]", "c", list.get(2));
}
protected Message createMessage(int index) throws JMSException {
Message answer = session.createMessage();
answer.setStringProperty("textField", data[index]);
Map grandChildMap = new HashMap();
grandChildMap.put("x", "abc");
grandChildMap.put("y", Arrays.asList(new Object[] { "a", "b", "c" }));
Map nestedMap = new HashMap();
nestedMap.put("a", "foo");
nestedMap.put("b", new Integer(23));
nestedMap.put("c", new Long(45));
nestedMap.put("d", grandChildMap);
answer.setObjectProperty("mapField", nestedMap);
answer.setObjectProperty("listField", Arrays.asList(new Object[] { "a", "b", "c" }));
return answer;
}
}

View File

@ -0,0 +1,96 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.test.message;
import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsAndEmbeddedBrokerTest;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*
* @version $Revision$
*/
public class NestedMapMessageTest extends JmsTopicSendReceiveWithTwoConnectionsAndEmbeddedBrokerTest {
private static final Log log = LogFactory.getLog(NestedMapMessageTest.class);
protected void assertMessageValid(int index, Message message) throws JMSException {
assertTrue("Should be a MapMessage: " + message, message instanceof MapMessage);
MapMessage mapMessage = (MapMessage) message;
Object value = mapMessage.getObject("textField");
assertEquals("textField", data[index], value);
Map map = (Map) mapMessage.getObject("mapField");
assertNotNull(map);
assertEquals("mapField.a", "foo", map.get("a"));
assertEquals("mapField.b", new Integer(23), map.get("b"));
assertEquals("mapField.c", new Long(45), map.get("c"));
value = map.get("d");
assertTrue("mapField.d should be a Map", value instanceof Map);
map = (Map) value;
assertEquals("mapField.d.x", "abc", map.get("x"));
value = map.get("y");
assertTrue("mapField.d.y is a List", value instanceof List);
List list = (List) value;
log.debug("mapField.d.y: " + list);
assertEquals("listField.size", 3, list.size());
log.debug("Found map: " + map);
list = (List) mapMessage.getObject("listField");
log.debug("listField: " + list);
assertEquals("listField.size", 3, list.size());
assertEquals("listField[0]", "a", list.get(0));
assertEquals("listField[1]", "b", list.get(1));
assertEquals("listField[2]", "c", list.get(2));
}
protected Message createMessage(int index) throws JMSException {
MapMessage answer = session.createMapMessage();
answer.setString("textField", data[index]);
Map grandChildMap = new HashMap();
grandChildMap.put("x", "abc");
grandChildMap.put("y", Arrays.asList(new Object[] { "a", "b", "c" }));
Map nestedMap = new HashMap();
nestedMap.put("a", "foo");
nestedMap.put("b", new Integer(23));
nestedMap.put("c", new Long(45));
nestedMap.put("d", grandChildMap);
answer.setObject("mapField", nestedMap);
answer.setObject("listField", Arrays.asList(new Object[] { "a", "b", "c" }));
return answer;
}
}