diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java index f0aeb81a00..71a39f80ca 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java @@ -151,6 +151,11 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag } } + @Override + public boolean isContentMarshalled() { + return content != null || dataOut == null; + } + @Override public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java index e1db3f7778..a69ec169e0 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java @@ -162,6 +162,11 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage { } } + @Override + public boolean isContentMarshalled() { + return content != null || map == null || map.isEmpty(); + } + /** * Builds the message body from data * diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java index 7995993e25..c811e14742 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java @@ -776,4 +776,11 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess public void storeContentAndClear() { storeContent(); } + + @Override + protected boolean isContentMarshalled() { + //Always return true because ActiveMQMessage only has a content field + //which is already marshalled + return true; + } } diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessageIsMarshalledTest.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessageIsMarshalledTest.java new file mode 100644 index 0000000000..79a4a2a505 --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessageIsMarshalledTest.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.command; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Collection; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.util.ByteSequence; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** + * Test to make sure message.isMarshalled() returns the correct value + */ +@RunWith(Parameterized.class) +public class ActiveMQMessageIsMarshalledTest { + + protected enum MessageType {BYTES, MAP, TEXT, OBJECT, STREAM, MESSAGE} + + private final MessageType messageType; + + @Parameters(name="messageType={0}") + public static Collection data() { + return Arrays.asList(new Object[][] { + {MessageType.BYTES}, + {MessageType.MAP}, + {MessageType.TEXT}, + {MessageType.OBJECT}, + {MessageType.STREAM}, + {MessageType.MESSAGE} + }); + } + + public ActiveMQMessageIsMarshalledTest(final MessageType messageType) { + super(); + this.messageType = messageType; + } + + @Test + public void testIsMarshalledWithBodyAndProperties() throws Exception { + ActiveMQMessage message = getMessage(true, true); + assertIsMarshalled(message, true, true); + } + + @Test + public void testIsMarshalledWithPropertyEmptyBody() throws Exception { + ActiveMQMessage message = getMessage(false, true); + assertIsMarshalled(message, false, true); + } + + @Test + public void testIsMarshalledWithBodyEmptyProperties() throws Exception { + ActiveMQMessage message = getMessage(true, false); + assertIsMarshalled(message, true, false); + } + + @Test + public void testIsMarshalledWithEmptyBodyEmptyProperties() throws Exception { + ActiveMQMessage message = getMessage(false, false); + + //No body or properties so the message should be considered marshalled already + assertTrue(message.isMarshalled()); + } + + private ActiveMQMessage getMessage(boolean includeBody, boolean includeProperties) throws Exception { + if (MessageType.BYTES == messageType) { + return getBytesMessage(includeBody, includeProperties); + } else if (MessageType.TEXT == messageType) { + return getTextMessage(includeBody, includeProperties); + } else if (MessageType.MAP == messageType) { + return getMapMessage(includeBody, includeProperties); + } else if (MessageType.OBJECT == messageType) { + return getObjectMessage(includeBody, includeProperties); + } else if (MessageType.STREAM == messageType) { + return getStreamMessage(includeBody, includeProperties); + } else if (MessageType.MESSAGE == messageType) { + return getActiveMQMessage(includeBody, includeProperties); + } + + return null; + } + + private ActiveMQBytesMessage getBytesMessage(boolean includeBody, boolean includeProperties) throws Exception { + ActiveMQBytesMessage message = new ActiveMQBytesMessage(); + if (includeBody) { + message.writeBytes(new byte[10]); + } + if (includeProperties) { + message.setProperty("test", "test"); + } + return message; + } + + private ActiveMQMapMessage getMapMessage(boolean includeBody, boolean includeProperties) throws Exception { + ActiveMQMapMessage message = new ActiveMQMapMessage(); + if (includeBody) { + message.setString("stringbody", "stringbody"); + } + if (includeProperties) { + message.setProperty("test", "test"); + } + return message; + } + + private ActiveMQTextMessage getTextMessage(boolean includeBody, boolean includeProperties) throws Exception { + ActiveMQTextMessage message = new ActiveMQTextMessage(); + if (includeBody) { + message.setText("test"); + } + if (includeProperties) { + message.setProperty("test", "test"); + } + return message; + } + + private ActiveMQObjectMessage getObjectMessage(boolean includeBody, boolean includeProperties) throws Exception { + ActiveMQObjectMessage message = new ActiveMQObjectMessage(); + ActiveMQConnection con = ActiveMQConnection.makeConnection(); + con.setObjectMessageSerializationDefered(true); + message.setConnection(con); + if (includeBody) { + message.setObject("test"); + } + if (includeProperties) { + message.setProperty("test", "test"); + } + return message; + } + + private ActiveMQStreamMessage getStreamMessage(boolean includeBody, boolean includeProperties) throws Exception { + ActiveMQStreamMessage message = new ActiveMQStreamMessage(); + if (includeBody) { + message.writeBytes(new byte[10]); + } + if (includeProperties) { + message.setProperty("test", "test"); + } + return message; + } + + private ActiveMQMessage getActiveMQMessage(boolean includeBody, boolean includeProperties) throws Exception { + ActiveMQMessage message = new ActiveMQMessage(); + if (includeBody) { + message.setContent(new ByteSequence(new byte[10])); + } + if (includeProperties) { + message.setProperty("test", "test"); + } + return message; + } + + private void assertIsMarshalled(final ActiveMQMessage message, boolean includeBody, boolean includeProperties) throws Exception { + if (ActiveMQMessage.class.equals(message.getClass())) { + //content is either not set or already marshalled for ActiveMQMessage so this only + //relies on + assertFalse(message.isMarshalled() == includeProperties); + } else { + assertFalse(message.isMarshalled()); + message.onSend(); + message.beforeMarshall(new OpenWireFormat()); + assertTrue(message.isMarshalled()); + assertTrue(message.getMarshalledProperties() != null == includeProperties); + assertTrue(message.getContent() != null == includeBody); + } + } + +} diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java index 8c5611f732..64f0172074 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java @@ -128,6 +128,11 @@ public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMess } } + @Override + public boolean isContentMarshalled() { + return content != null || object == null; + } + @Override public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java index f6e927ae3f..67159104b0 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java @@ -153,6 +153,11 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess } } + @Override + public boolean isContentMarshalled() { + return content != null || dataOut == null; + } + @Override public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java index bb89378407..c70f54fa60 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java @@ -158,6 +158,11 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage this.text = null; } + @Override + public boolean isContentMarshalled() { + return content != null || text == null; + } + /** * Clears out the message body. Clearing a message's body does not clear its * header values or property entries.

diff --git a/activemq-client/src/main/java/org/apache/activemq/command/Message.java b/activemq-client/src/main/java/org/apache/activemq/command/Message.java index ca2aee7c60..fca3b46a58 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/Message.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/Message.java @@ -124,7 +124,15 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess } public boolean isMarshalled() { - return content != null && (marshalledProperties != null || properties == null); + return isContentMarshalled() && isPropertiesMarshalled(); + } + + protected boolean isPropertiesMarshalled() { + return marshalledProperties != null || properties == null; + } + + protected boolean isContentMarshalled() { + return content != null; } protected void copy(Message copy) {