mirror of https://github.com/apache/activemq.git
AMQ-7309 - Implement Message#isBodyAssignableTo and Message#getBody methods (#979)
AMQ-7309 - Implement Message#isBodyAssignableTo and Message#getBody methods
This commit is contained in:
parent
ae4e305f85
commit
9fe24cd8e3
|
@ -108,33 +108,7 @@ public class ActiveMQProducer implements JMSProducer {
|
|||
if (body != null) {
|
||||
try {
|
||||
for (Map.Entry<String, Object> mapEntry : body.entrySet()) {
|
||||
final String key = mapEntry.getKey();
|
||||
final Object value = mapEntry.getValue();
|
||||
final Class<?> valueObject = value.getClass();
|
||||
if (String.class.isAssignableFrom(valueObject)) {
|
||||
mapMessage.setString(key, String.class.cast(value));
|
||||
} else if (Integer.class.isAssignableFrom(valueObject)) {
|
||||
mapMessage.setInt(key, Integer.class.cast(value));
|
||||
} else if (Long.class.isAssignableFrom(valueObject)) {
|
||||
mapMessage.setLong(key, Long.class.cast(value));
|
||||
} else if (Double.class.isAssignableFrom(valueObject)) {
|
||||
mapMessage.setDouble(key, Double.class.cast(value));
|
||||
} else if (Boolean.class.isAssignableFrom(valueObject)) {
|
||||
mapMessage.setBoolean(key, Boolean.class.cast(value));
|
||||
} else if (Character.class.isAssignableFrom(valueObject)) {
|
||||
mapMessage.setChar(key, Character.class.cast(value));
|
||||
} else if (Short.class.isAssignableFrom(valueObject)) {
|
||||
mapMessage.setShort(key, Short.class.cast(value));
|
||||
} else if (Float.class.isAssignableFrom(valueObject)) {
|
||||
mapMessage.setFloat(key, Float.class.cast(value));
|
||||
} else if (Byte.class.isAssignableFrom(valueObject)) {
|
||||
mapMessage.setByte(key, Byte.class.cast(value));
|
||||
} else if (byte[].class.isAssignableFrom(valueObject)) {
|
||||
byte[] array = byte[].class.cast(value);
|
||||
mapMessage.setBytes(key, array, 0, array.length);
|
||||
} else {
|
||||
mapMessage.setObject(key, value);
|
||||
}
|
||||
mapMessage.setObject(mapEntry.getKey(), mapEntry.getValue());
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
throw new MessageFormatRuntimeException(e.getMessage());
|
||||
|
|
|
@ -956,4 +956,21 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public boolean isBodyAssignableTo(Class c) {
|
||||
return getContent() == null || c.isAssignableFrom(byte[].class);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected <T> T doGetBody(Class<T> asType) {
|
||||
//Make sure the bytes are stored before trying to copy and return
|
||||
if (dataOut != null && getContent() == null) {
|
||||
storeContent();
|
||||
}
|
||||
|
||||
final ByteSequence content = getContent();
|
||||
return content != null ? (T) content.toArray() : null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.io.ObjectStreamException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.Serializable;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
|
@ -174,9 +175,16 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage {
|
|||
* @throws IOException
|
||||
*/
|
||||
private void loadContent() throws JMSException {
|
||||
if (getContent() != null && map.isEmpty()) {
|
||||
map = deserialize(getContent());
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, Object> deserialize(ByteSequence content) throws JMSException {
|
||||
final Map<String, Object> map;
|
||||
|
||||
try {
|
||||
if (getContent() != null && map.isEmpty()) {
|
||||
ByteSequence content = getContent();
|
||||
if (content != null) {
|
||||
InputStream is = new ByteArrayInputStream(content);
|
||||
if (isCompressed()) {
|
||||
is = new InflaterInputStream(is);
|
||||
|
@ -184,10 +192,14 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage {
|
|||
DataInputStream dataIn = new DataInputStream(is);
|
||||
map = MarshallingSupport.unmarshalPrimitiveMap(dataIn);
|
||||
dataIn.close();
|
||||
} else {
|
||||
map = new HashMap<>();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw JMSExceptionSupport.create(e);
|
||||
}
|
||||
|
||||
return map;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -827,4 +839,30 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage {
|
|||
initializeReading();
|
||||
return map;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public boolean isBodyAssignableTo(Class c) throws JMSException {
|
||||
final Map<String, Object> map = getContentMap();
|
||||
if (map == null || map.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
return c.isAssignableFrom(java.util.Map.class);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected <T> T doGetBody(Class<T> asType) throws JMSException {
|
||||
storeContent();
|
||||
final ByteSequence content = getContent();
|
||||
final Map<String, Object> map = content != null ? deserialize(content) : null;
|
||||
|
||||
//This implementation treats an empty map as not having a body so if empty
|
||||
//we should return null as well
|
||||
if (map != null && !map.isEmpty()) {
|
||||
map.replaceAll((k, v) -> v instanceof UTF8Buffer ? v.toString() : v);
|
||||
return (T) map;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -795,14 +795,22 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess
|
|||
this.deliveryTime = deliveryTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T getBody(Class<T> c) throws JMSException {
|
||||
throw new UnsupportedOperationException("getBody(Class<T>) is not supported");
|
||||
}
|
||||
@Override
|
||||
public final <T> T getBody(Class<T> asType) throws JMSException {
|
||||
if (isBodyAssignableTo(asType)) {
|
||||
return doGetBody(asType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isBodyAssignableTo(Class c) throws JMSException {
|
||||
throw new UnsupportedOperationException("isBodyAssignableTo(Class) is not supported");
|
||||
}
|
||||
throw new MessageFormatException("Message body cannot be read as type: " + asType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isBodyAssignableTo(Class c) throws JMSException {
|
||||
return true;
|
||||
}
|
||||
|
||||
protected <T> T doGetBody(Class<T> asType) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -194,9 +194,18 @@ public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMess
|
|||
*/
|
||||
@Override
|
||||
public Serializable getObject() throws JMSException {
|
||||
if (object == null && getContent() != null) {
|
||||
final ByteSequence content = getContent();
|
||||
if (object == null && content != null) {
|
||||
this.object = deserialize(content);
|
||||
}
|
||||
return this.object;
|
||||
}
|
||||
|
||||
private Serializable deserialize(ByteSequence content) throws JMSException {
|
||||
Serializable object = null;
|
||||
|
||||
if (content != null) {
|
||||
try {
|
||||
ByteSequence content = getContent();
|
||||
InputStream is = new ByteArrayInputStream(content);
|
||||
if (isCompressed()) {
|
||||
is = new InflaterInputStream(is);
|
||||
|
@ -216,7 +225,7 @@ public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMess
|
|||
throw JMSExceptionSupport.create("Failed to build body from bytes. Reason: " + e, e);
|
||||
}
|
||||
}
|
||||
return this.object;
|
||||
return object;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -277,4 +286,20 @@ public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMess
|
|||
trustedPackages = Arrays.asList(ClassLoadingAwareObjectInputStream.serializablePackages);
|
||||
trustAllPackages = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isBodyAssignableTo(Class c) throws JMSException {
|
||||
final Serializable object = getObject();
|
||||
if (object == null) {
|
||||
return true;
|
||||
}
|
||||
return Serializable.class == c || Object.class == c || c.isInstance(object);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected <T> T doGetBody(Class<T> asType) throws JMSException {
|
||||
storeContent();
|
||||
final ByteSequence content = getContent();
|
||||
return content != null ? (T) deserialize(content) : null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1218,4 +1218,9 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
|
|||
public String toString() {
|
||||
return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isBodyAssignableTo(Class c) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -210,4 +210,21 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
|
|||
}
|
||||
return super.toString();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public boolean isBodyAssignableTo(Class c) throws JMSException {
|
||||
/*
|
||||
* If null the JMS spec says this method always returns true
|
||||
* regardless of the passed in class type.
|
||||
*/
|
||||
if (getText() == null) {
|
||||
return true;
|
||||
}
|
||||
return c.isAssignableFrom(java.lang.String.class);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected <T> T doGetBody(Class<T> asType) throws JMSException {
|
||||
return (T) getText();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.activemq.util;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
public class ByteSequence {
|
||||
|
||||
public byte[] data;
|
||||
|
@ -126,4 +128,14 @@ public class ByteSequence {
|
|||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Makes a deep copy of the data into a new byte array
|
||||
* starting at the offset.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public byte[] toArray() {
|
||||
return Arrays.copyOfRange(getData(), getOffset(), getLength());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,196 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.jms2;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Map;
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MapMessage;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageFormatException;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.StreamMessage;
|
||||
import javax.jms.TextMessage;
|
||||
import org.apache.activemq.command.ActiveMQBytesMessage;
|
||||
import org.apache.activemq.command.ActiveMQMapMessage;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.ActiveMQObjectMessage;
|
||||
import org.apache.activemq.command.ActiveMQStreamMessage;
|
||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ActiveMQJMS2MessageTest {
|
||||
|
||||
@Test
|
||||
public void testMessageIsAssignableTo() throws JMSException {
|
||||
Message message = new ActiveMQMessage();
|
||||
assertTrue(message.isBodyAssignableTo(String.class));
|
||||
assertTrue(message.isBodyAssignableTo(Integer.class));
|
||||
assertTrue(message.isBodyAssignableTo(Object.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMessageGetBody() throws JMSException {
|
||||
Message message = new ActiveMQMessage();
|
||||
assertNull(message.getBody(String.class));
|
||||
assertNull(message.getBody(Object.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringMessageIsAssignableTo() throws JMSException {
|
||||
TextMessage nullBody = new ActiveMQTextMessage();
|
||||
assertTrue(nullBody.isBodyAssignableTo(String.class));
|
||||
//Spec says type is ignored and returns true if null body
|
||||
assertTrue(nullBody.isBodyAssignableTo(Integer.class));
|
||||
|
||||
TextMessage message = new ActiveMQTextMessage();
|
||||
message.setText("Test message");
|
||||
assertTrue(message.isBodyAssignableTo(String.class));
|
||||
assertFalse(message.isBodyAssignableTo(Integer.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringMessageGetBody() throws JMSException {
|
||||
TextMessage nullMessage = new ActiveMQTextMessage();
|
||||
assertNull(nullMessage.getBody(String.class));
|
||||
|
||||
TextMessage message = new ActiveMQTextMessage();
|
||||
message.setText("Test message");
|
||||
assertEquals("Test message", message.getBody(String.class));
|
||||
}
|
||||
|
||||
@Test(expected = MessageFormatException.class)
|
||||
public void testStringMessageGetBodyWrongType() throws JMSException {
|
||||
TextMessage message = new ActiveMQTextMessage();
|
||||
message.setText("Test message");
|
||||
message.getBody(Integer.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testByteMessageIsAssignableTo() throws JMSException {
|
||||
BytesMessage nullBody = new ActiveMQBytesMessage();
|
||||
//Spec says type is ignored and returns true if null body
|
||||
assertTrue(nullBody.isBodyAssignableTo(String.class));
|
||||
assertTrue(nullBody.isBodyAssignableTo(Integer.class));
|
||||
|
||||
ByteSequence testBytes = new ByteSequence("test".getBytes());
|
||||
ActiveMQBytesMessage message = new ActiveMQBytesMessage();
|
||||
message.setContent(testBytes);
|
||||
assertArrayEquals(testBytes.getData(), message.getBody(byte[].class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testByteMessageGetBody() throws JMSException {
|
||||
BytesMessage nullMessage = new ActiveMQBytesMessage();
|
||||
assertNull(nullMessage.getBody(String.class));
|
||||
|
||||
ByteSequence testBytes = new ByteSequence("test".getBytes());
|
||||
ActiveMQBytesMessage message = new ActiveMQBytesMessage();
|
||||
message.setContent(testBytes);
|
||||
assertArrayEquals(testBytes.getData(), message.getBody(byte[].class));
|
||||
}
|
||||
|
||||
@Test(expected = MessageFormatException.class)
|
||||
public void testByteMessageGetBodyWrongType() throws JMSException {
|
||||
ByteSequence testBytes = new ByteSequence("test".getBytes());
|
||||
ActiveMQBytesMessage message = new ActiveMQBytesMessage();
|
||||
message.setContent(testBytes);
|
||||
message.getBody(Integer.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testObjectMessageIsAssignableTo() throws JMSException {
|
||||
ObjectMessage nullBody = new ActiveMQObjectMessage();
|
||||
//Spec says type is ignored and returns true if null body
|
||||
assertTrue(nullBody.isBodyAssignableTo(String.class));
|
||||
assertTrue(nullBody.isBodyAssignableTo(Object.class));
|
||||
assertTrue(nullBody.isBodyAssignableTo(Integer.class));
|
||||
|
||||
ObjectMessage message = new ActiveMQObjectMessage();
|
||||
message.setObject("Test message");
|
||||
assertTrue(message.isBodyAssignableTo(String.class));
|
||||
assertFalse(message.isBodyAssignableTo(Integer.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testObjectMessageGetBody() throws JMSException {
|
||||
ObjectMessage nullMessage = new ActiveMQObjectMessage();
|
||||
assertNull(nullMessage.getBody(String.class));
|
||||
|
||||
ObjectMessage message = new ActiveMQObjectMessage();
|
||||
message.setObject("Test message");
|
||||
assertEquals("Test message", message.getBody(Serializable.class));
|
||||
assertEquals("Test message", message.getBody(String.class));
|
||||
}
|
||||
|
||||
@Test(expected = MessageFormatException.class)
|
||||
public void testObjectMessageGetBodyWrongType() throws JMSException {
|
||||
ObjectMessage message = new ActiveMQObjectMessage();
|
||||
message.setObject("Test message");
|
||||
message.getBody(Integer.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMapMessageIsAssignableTo() throws JMSException {
|
||||
MapMessage nullBody = new ActiveMQMapMessage();
|
||||
//Spec says type is ignored and returns true if null body
|
||||
assertTrue(nullBody.isBodyAssignableTo(String.class));
|
||||
assertTrue(nullBody.isBodyAssignableTo(Integer.class));
|
||||
assertTrue(nullBody.isBodyAssignableTo(Map.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMapMessageGetBody() throws JMSException {
|
||||
MapMessage nullMessage = new ActiveMQMapMessage();
|
||||
assertNull(nullMessage.getBody(Map.class));
|
||||
|
||||
MapMessage message = new ActiveMQMapMessage();
|
||||
message.setString("testkey", "testvalue");
|
||||
assertEquals("testvalue", message.getBody(Map.class).get("testkey"));
|
||||
}
|
||||
|
||||
@Test(expected = MessageFormatException.class)
|
||||
public void testMapMessageGetBodyWrongType() throws JMSException {
|
||||
MapMessage message = new ActiveMQMapMessage();
|
||||
message.setString("testkey", "testvalue");
|
||||
message.getBody(String.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStreamMessageIsAssignableTo() throws JMSException {
|
||||
StreamMessage nullBody = new ActiveMQStreamMessage();
|
||||
//Spec says always false
|
||||
assertFalse(nullBody.isBodyAssignableTo(String.class));
|
||||
assertFalse(nullBody.isBodyAssignableTo(Integer.class));
|
||||
assertFalse(nullBody.isBodyAssignableTo(Map.class));
|
||||
}
|
||||
|
||||
@Test(expected = MessageFormatException.class)
|
||||
public void testStreamMessageGetBody() throws JMSException {
|
||||
StreamMessage message = new ActiveMQStreamMessage();
|
||||
//spec says always throws exception
|
||||
message.getBody(Object.class);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue