diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 3cc1a1635e..66a8d66395 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.DataConstants; @@ -75,6 +76,8 @@ import org.apache.activemq.util.MarshallingSupport; import org.apache.activemq.wireformat.WireFormat; import org.fusesource.hawtbuf.UTF8Buffer; +import static org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP; + public final class OpenWireMessageConverter { private static final SimpleString JMS_TYPE_PROPERTY = new SimpleString("JMSType"); @@ -529,7 +532,7 @@ public final class OpenWireMessageConverter { AMQConsumer consumer, UUID serverNodeUUID) throws IOException { final ActiveMQMessage amqMsg; final byte coreType = coreMessage.getType(); - final Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED); + final Boolean compressProp = getObjectProperty(coreMessage, Boolean.class, AMQ_MSG_COMPRESSED); final boolean isCompressed = compressProp != null && compressProp; final byte[] bytes; final ActiveMQBuffer buffer = coreMessage.getDataBuffer(); @@ -564,7 +567,7 @@ public final class OpenWireMessageConverter { throw new IllegalStateException("Unknown message type: " + coreMessage.getType()); } - final String type = coreMessage.getStringProperty(JMS_TYPE_PROPERTY); + final String type = getObjectProperty(coreMessage, String.class, JMS_TYPE_PROPERTY); if (type != null) { amqMsg.setJMSType(type); } @@ -573,7 +576,7 @@ public final class OpenWireMessageConverter { amqMsg.setPriority(coreMessage.getPriority()); amqMsg.setTimestamp(coreMessage.getTimestamp()); - Long brokerInTime = (Long) coreMessage.getObjectProperty(AMQ_MSG_BROKER_IN_TIME); + Long brokerInTime = getObjectProperty(coreMessage, Long.class, AMQ_MSG_BROKER_IN_TIME); if (brokerInTime == null) { brokerInTime = 0L; } @@ -583,35 +586,35 @@ public final class OpenWireMessageConverter { //we need check null because messages may come from other clients //and those amq specific attribute may not be set. - Long arrival = (Long) coreMessage.getObjectProperty(AMQ_MSG_ARRIVAL); + Long arrival = getObjectProperty(coreMessage, Long.class, AMQ_MSG_ARRIVAL); if (arrival == null) { //messages from other sources (like core client) may not set this prop arrival = 0L; } amqMsg.setArrival(arrival); - final Object brokerPath = coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH); - if (brokerPath instanceof SimpleString && ((SimpleString)brokerPath).length() > 0) { - setAMQMsgBrokerPath(amqMsg, ((SimpleString)brokerPath).toString()); + final SimpleString brokerPath = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_BROKER_PATH); + if (brokerPath != null && brokerPath.length() > 0) { + setAMQMsgBrokerPath(amqMsg, brokerPath.toString()); } - final Object clusterPath = coreMessage.getObjectProperty(AMQ_MSG_CLUSTER); - if (clusterPath instanceof SimpleString && ((SimpleString)clusterPath).length() > 0) { - setAMQMsgClusterPath(amqMsg, ((SimpleString)clusterPath).toString()); + final SimpleString clusterPath = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_CLUSTER); + if (clusterPath != null && clusterPath.length() > 0) { + setAMQMsgClusterPath(amqMsg, clusterPath.toString()); } - Integer commandId = (Integer) coreMessage.getObjectProperty(AMQ_MSG_COMMAND_ID); + Integer commandId = getObjectProperty(coreMessage, Integer.class, AMQ_MSG_COMMAND_ID); if (commandId == null) { commandId = -1; } amqMsg.setCommandId(commandId); - final SimpleString corrId = (SimpleString) coreMessage.getObjectProperty(JMS_CORRELATION_ID_PROPERTY); + final SimpleString corrId = getObjectProperty(coreMessage, SimpleString.class, JMS_CORRELATION_ID_PROPERTY); if (corrId != null) { amqMsg.setCorrelationId(corrId.toString()); } - final byte[] dsBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_DATASTRUCTURE); + final byte[] dsBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_DATASTRUCTURE); if (dsBytes != null) { setAMQMsgDataStructure(amqMsg, marshaller, dsBytes); } @@ -626,7 +629,7 @@ public final class OpenWireMessageConverter { amqMsg.setGroupSequence(coreMessage.getGroupSequence()); - final byte[] midBytes = coreMessage.getBytesProperty(AMQ_MSG_MESSAGE_ID); + final byte[] midBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_MESSAGE_ID); final MessageId mid; if (midBytes != null) { ByteSequence midSeq = new ByteSequence(midBytes); @@ -639,45 +642,45 @@ public final class OpenWireMessageConverter { amqMsg.setMessageId(mid); - final byte[] origDestBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_DESTINATION); + final byte[] origDestBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_ORIG_DESTINATION); if (origDestBytes != null) { setAMQMsgOriginalDestination(amqMsg, marshaller, origDestBytes); } - final byte[] origTxIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_TXID); + final byte[] origTxIdBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_ORIG_TXID); if (origTxIdBytes != null) { setAMQMsgOriginalTransactionId(amqMsg, marshaller, origTxIdBytes); } - final byte[] producerIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_PRODUCER_ID); + final byte[] producerIdBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_PRODUCER_ID); if (producerIdBytes != null) { ProducerId producerId = (ProducerId) marshaller.unmarshal(new ByteSequence(producerIdBytes)); amqMsg.setProducerId(producerId); } - final byte[] marshalledBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MARSHALL_PROP); + final byte[] marshalledBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_MARSHALL_PROP); if (marshalledBytes != null) { amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes)); } amqMsg.setRedeliveryCounter(reference.getDeliveryCount() - 1); - final byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO); + final byte[] replyToBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_REPLY_TO); if (replyToBytes != null) { setAMQMsgReplyTo(amqMsg, marshaller, replyToBytes); } - final Object userId = coreMessage.getObjectProperty(AMQ_MSG_USER_ID); - if (userId instanceof SimpleString && ((SimpleString)userId).length() > 0) { - amqMsg.setUserID(((SimpleString)userId).toString()); + final SimpleString userId = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_USER_ID); + if (userId != null && userId.length() > 0) { + amqMsg.setUserID(userId.toString()); } - final Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE); + final Boolean isDroppable = getObjectProperty(coreMessage, Boolean.class, AMQ_MSG_DROPPABLE); if (isDroppable != null) { amqMsg.setDroppable(isDroppable); } - final SimpleString dlqCause = (SimpleString) coreMessage.getObjectProperty(AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); + final SimpleString dlqCause = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); if (dlqCause != null) { setAMQMsgDlqDeliveryFailureCause(amqMsg, dlqCause); } @@ -687,7 +690,7 @@ public final class OpenWireMessageConverter { setAMQMsgHdrLastValueName(amqMsg, lastValueProperty); } - final Long ingressTimestamp = coreMessage.getPropertyNames().contains(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP) ? coreMessage.getLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP) : null; + final Long ingressTimestamp = getObjectProperty(coreMessage, Long.class, HDR_INGRESS_TIMESTAMP); if (ingressTimestamp != null) { setAMQMsgHdrIngressTimestamp(amqMsg, ingressTimestamp); } @@ -704,6 +707,17 @@ public final class OpenWireMessageConverter { return amqMsg; } + private static T getObjectProperty(ICoreMessage message, Class type, SimpleString property) { + if (message.getPropertyNames().contains(property)) { + try { + return type.cast(message.getObjectProperty(property)); + } catch (ClassCastException e) { + ActiveMQServerLogger.LOGGER.failedToDealWithObjectProperty(property, e.getMessage()); + } + } + return null; + } + private static byte[] toAMQMessageTextType(final ActiveMQBuffer buffer, final boolean isCompressed) throws IOException { byte[] bytes = null; @@ -946,7 +960,7 @@ public final class OpenWireMessageConverter { private static void setAMQMsgHdrIngressTimestamp(final ActiveMQMessage amqMsg, final Long ingressTimestamp) throws IOException { try { - amqMsg.setLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP.toString(), ingressTimestamp); + amqMsg.setLongProperty(HDR_INGRESS_TIMESTAMP.toString(), ingressTimestamp); } catch (JMSException e) { throw new IOException("failure to set ingress timestamp property " + ingressTimestamp, e); } @@ -973,7 +987,7 @@ public final class OpenWireMessageConverter { amqMsg.setObjectProperty(keyStr, prop); } } catch (JMSException e) { - throw new IOException("exception setting property " + s + " : " + prop, e); + ActiveMQServerLogger.LOGGER.failedToDealWithObjectProperty(s, e.getMessage()); } } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java index e1ecfd4790..6231eefe31 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java +++ b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java @@ -36,6 +36,7 @@ import org.junit.Test; import org.mockito.Mockito; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class OpenWireMessageConverterTest { @@ -108,4 +109,29 @@ public class OpenWireMessageConverterTest { assertTrue(messageDispatch.getMessage().getProperty(bytesPropertyKey) instanceof String); } + + @Test + public void testBadPropertyConversion() throws Exception { + final String hdrArrival = "__HDR_ARRIVAL"; + final String hdrBrokerInTime = "__HDR_BROKER_IN_TIME"; + final String hdrCommandId = "__HDR_COMMAND_ID"; + final String hdrDroppable = "__HDR_DROPPABLE"; + + ICoreMessage coreMessage = new CoreMessage().initBuffer(8); + coreMessage.putStringProperty(hdrArrival, "1234"); + coreMessage.putStringProperty(hdrBrokerInTime, "5678"); + coreMessage.putStringProperty(hdrCommandId, "foo"); + coreMessage.putStringProperty(hdrDroppable, "true"); + + MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class)); + AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); + Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); + + MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID); + + assertNull(messageDispatch.getMessage().getProperty(hdrArrival)); + assertNull(messageDispatch.getMessage().getProperty(hdrBrokerInTime)); + assertNull(messageDispatch.getMessage().getProperty(hdrCommandId)); + assertNull(messageDispatch.getMessage().getProperty(hdrDroppable)); + } } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index bb72f57ebd..a056c84550 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1751,6 +1751,10 @@ public interface ActiveMQServerLogger extends BasicLogger { format = Message.Format.MESSAGE_FORMAT) void duplicateAddressSettingMatch(String match); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222302, value = "Failed to deal with property {0} when converting message from core to OpenWire: {1}", format = Message.Format.MESSAGE_FORMAT) + void failedToDealWithObjectProperty(SimpleString property, String exceptionMessage); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) void initializationError(@Cause Throwable e);