diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 62472bb65b..3348b0fa9e 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -590,14 +590,14 @@ public final class OpenWireMessageConverter { } amqMsg.setArrival(arrival); - final String brokerPath = (String) coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH); - if (brokerPath != null && !brokerPath.isEmpty()) { - setAMQMsgBrokerPath(amqMsg, brokerPath); + final Object brokerPath = coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH); + if (brokerPath != null && brokerPath instanceof SimpleString && ((SimpleString)brokerPath).length() > 0) { + setAMQMsgBrokerPath(amqMsg, ((SimpleString)brokerPath).toString()); } - final String clusterPath = (String) coreMessage.getObjectProperty(AMQ_MSG_CLUSTER); - if (clusterPath != null && !clusterPath.isEmpty()) { - setAMQMsgClusterPath(amqMsg, clusterPath); + final Object clusterPath = coreMessage.getObjectProperty(AMQ_MSG_CLUSTER); + if (clusterPath != null && clusterPath instanceof SimpleString && ((SimpleString)clusterPath).length() > 0) { + setAMQMsgClusterPath(amqMsg, ((SimpleString)clusterPath).toString()); } Integer commandId = (Integer) coreMessage.getObjectProperty(AMQ_MSG_COMMAND_ID); @@ -672,9 +672,9 @@ public final class OpenWireMessageConverter { setAMQMsgReplyTo(amqMsg, marshaller, replyToBytes); } - final String userId = (String) coreMessage.getObjectProperty(AMQ_MSG_USER_ID); - if (userId != null) { - amqMsg.setUserID(userId); + final Object userId = coreMessage.getObjectProperty(AMQ_MSG_USER_ID); + if (userId != null && userId instanceof SimpleString && ((SimpleString)userId).length() > 0) { + amqMsg.setUserID(((SimpleString)userId).toString()); } final Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index 979d2765cf..d911e715ac 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -73,6 +73,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -1780,6 +1781,34 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { assertNull(transaction); } + @Test + public void testPropertyConversions() throws Exception { + final String BROKER_PATH = RandomUtil.randomString(); + final String CLUSTER = RandomUtil.randomString(); + final String USER_ID = RandomUtil.randomString(); + + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); ///// PRODUCE MESSAGE + + MessageProducer producer = session.createProducer(queue); + TextMessage message = session.createTextMessage("This is a text message"); + message.setStringProperty("__HDR_BROKER_PATH", BROKER_PATH); + message.setStringProperty("__HDR_CLUSTER", CLUSTER); + message.setStringProperty("__HDR_USER_ID", USER_ID); + + producer.send(message); + + MessageConsumer messageConsumer = session.createConsumer(queue); + connection.start(); + TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000); + assertNotNull(messageReceived); + assertEquals(BROKER_PATH, messageReceived.getStringProperty("__HDR_BROKER_PATH")); + assertEquals(CLUSTER, messageReceived.getStringProperty("__HDR_CLUSTER")); + assertEquals(USER_ID, messageReceived.getStringProperty("__HDR_USER_ID")); + } + } + private void checkQueueEmpty(String qName) { PostOffice po = server.getPostOffice(); LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString(qName));