From 13df6a8fb9522e53d875755be1d217ab3e260d65 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Thu, 2 Sep 2021 11:23:12 -0500 Subject: [PATCH] ARTEMIS-3457 log WARN for OpenWire property conversion problem While converting a core message to an OpenWire message there may be an error processing a property value. Currently this results in an exception and the message is not dispatched to the client. The broker eventually attempts to redeliver this message resulting in the same error. Instead of throwing an exception the broker should simply log a WARN message and skip the property. This will allow clients to receive the message without the problematic property and the broker will not have to attempt to redeliver the message again. --- .../openwire/OpenWireMessageConverter.java | 68 +++++++++++-------- .../OpenWireMessageConverterTest.java | 26 +++++++ .../core/server/ActiveMQServerLogger.java | 4 ++ 3 files changed, 71 insertions(+), 27 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 3cc1a1635e..66a8d66395 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.DataConstants; @@ -75,6 +76,8 @@ import org.apache.activemq.util.MarshallingSupport; import org.apache.activemq.wireformat.WireFormat; import org.fusesource.hawtbuf.UTF8Buffer; +import static org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP; + public final class OpenWireMessageConverter { private static final SimpleString JMS_TYPE_PROPERTY = new SimpleString("JMSType"); @@ -529,7 +532,7 @@ public final class OpenWireMessageConverter { AMQConsumer consumer, UUID serverNodeUUID) throws IOException { final ActiveMQMessage amqMsg; final byte coreType = coreMessage.getType(); - final Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED); + final Boolean compressProp = getObjectProperty(coreMessage, Boolean.class, AMQ_MSG_COMPRESSED); final boolean isCompressed = compressProp != null && compressProp; final byte[] bytes; final ActiveMQBuffer buffer = coreMessage.getDataBuffer(); @@ -564,7 +567,7 @@ public final class OpenWireMessageConverter { throw new IllegalStateException("Unknown message type: " + coreMessage.getType()); } - final String type = coreMessage.getStringProperty(JMS_TYPE_PROPERTY); + final String type = getObjectProperty(coreMessage, String.class, JMS_TYPE_PROPERTY); if (type != null) { amqMsg.setJMSType(type); } @@ -573,7 +576,7 @@ public final class OpenWireMessageConverter { amqMsg.setPriority(coreMessage.getPriority()); amqMsg.setTimestamp(coreMessage.getTimestamp()); - Long brokerInTime = (Long) coreMessage.getObjectProperty(AMQ_MSG_BROKER_IN_TIME); + Long brokerInTime = getObjectProperty(coreMessage, Long.class, AMQ_MSG_BROKER_IN_TIME); if (brokerInTime == null) { brokerInTime = 0L; } @@ -583,35 +586,35 @@ public final class OpenWireMessageConverter { //we need check null because messages may come from other clients //and those amq specific attribute may not be set. - Long arrival = (Long) coreMessage.getObjectProperty(AMQ_MSG_ARRIVAL); + Long arrival = getObjectProperty(coreMessage, Long.class, AMQ_MSG_ARRIVAL); if (arrival == null) { //messages from other sources (like core client) may not set this prop arrival = 0L; } amqMsg.setArrival(arrival); - final Object brokerPath = coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH); - if (brokerPath instanceof SimpleString && ((SimpleString)brokerPath).length() > 0) { - setAMQMsgBrokerPath(amqMsg, ((SimpleString)brokerPath).toString()); + final SimpleString brokerPath = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_BROKER_PATH); + if (brokerPath != null && brokerPath.length() > 0) { + setAMQMsgBrokerPath(amqMsg, brokerPath.toString()); } - final Object clusterPath = coreMessage.getObjectProperty(AMQ_MSG_CLUSTER); - if (clusterPath instanceof SimpleString && ((SimpleString)clusterPath).length() > 0) { - setAMQMsgClusterPath(amqMsg, ((SimpleString)clusterPath).toString()); + final SimpleString clusterPath = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_CLUSTER); + if (clusterPath != null && clusterPath.length() > 0) { + setAMQMsgClusterPath(amqMsg, clusterPath.toString()); } - Integer commandId = (Integer) coreMessage.getObjectProperty(AMQ_MSG_COMMAND_ID); + Integer commandId = getObjectProperty(coreMessage, Integer.class, AMQ_MSG_COMMAND_ID); if (commandId == null) { commandId = -1; } amqMsg.setCommandId(commandId); - final SimpleString corrId = (SimpleString) coreMessage.getObjectProperty(JMS_CORRELATION_ID_PROPERTY); + final SimpleString corrId = getObjectProperty(coreMessage, SimpleString.class, JMS_CORRELATION_ID_PROPERTY); if (corrId != null) { amqMsg.setCorrelationId(corrId.toString()); } - final byte[] dsBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_DATASTRUCTURE); + final byte[] dsBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_DATASTRUCTURE); if (dsBytes != null) { setAMQMsgDataStructure(amqMsg, marshaller, dsBytes); } @@ -626,7 +629,7 @@ public final class OpenWireMessageConverter { amqMsg.setGroupSequence(coreMessage.getGroupSequence()); - final byte[] midBytes = coreMessage.getBytesProperty(AMQ_MSG_MESSAGE_ID); + final byte[] midBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_MESSAGE_ID); final MessageId mid; if (midBytes != null) { ByteSequence midSeq = new ByteSequence(midBytes); @@ -639,45 +642,45 @@ public final class OpenWireMessageConverter { amqMsg.setMessageId(mid); - final byte[] origDestBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_DESTINATION); + final byte[] origDestBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_ORIG_DESTINATION); if (origDestBytes != null) { setAMQMsgOriginalDestination(amqMsg, marshaller, origDestBytes); } - final byte[] origTxIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_TXID); + final byte[] origTxIdBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_ORIG_TXID); if (origTxIdBytes != null) { setAMQMsgOriginalTransactionId(amqMsg, marshaller, origTxIdBytes); } - final byte[] producerIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_PRODUCER_ID); + final byte[] producerIdBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_PRODUCER_ID); if (producerIdBytes != null) { ProducerId producerId = (ProducerId) marshaller.unmarshal(new ByteSequence(producerIdBytes)); amqMsg.setProducerId(producerId); } - final byte[] marshalledBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MARSHALL_PROP); + final byte[] marshalledBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_MARSHALL_PROP); if (marshalledBytes != null) { amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes)); } amqMsg.setRedeliveryCounter(reference.getDeliveryCount() - 1); - final byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO); + final byte[] replyToBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_REPLY_TO); if (replyToBytes != null) { setAMQMsgReplyTo(amqMsg, marshaller, replyToBytes); } - final Object userId = coreMessage.getObjectProperty(AMQ_MSG_USER_ID); - if (userId instanceof SimpleString && ((SimpleString)userId).length() > 0) { - amqMsg.setUserID(((SimpleString)userId).toString()); + final SimpleString userId = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_USER_ID); + if (userId != null && userId.length() > 0) { + amqMsg.setUserID(userId.toString()); } - final Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE); + final Boolean isDroppable = getObjectProperty(coreMessage, Boolean.class, AMQ_MSG_DROPPABLE); if (isDroppable != null) { amqMsg.setDroppable(isDroppable); } - final SimpleString dlqCause = (SimpleString) coreMessage.getObjectProperty(AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); + final SimpleString dlqCause = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); if (dlqCause != null) { setAMQMsgDlqDeliveryFailureCause(amqMsg, dlqCause); } @@ -687,7 +690,7 @@ public final class OpenWireMessageConverter { setAMQMsgHdrLastValueName(amqMsg, lastValueProperty); } - final Long ingressTimestamp = coreMessage.getPropertyNames().contains(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP) ? coreMessage.getLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP) : null; + final Long ingressTimestamp = getObjectProperty(coreMessage, Long.class, HDR_INGRESS_TIMESTAMP); if (ingressTimestamp != null) { setAMQMsgHdrIngressTimestamp(amqMsg, ingressTimestamp); } @@ -704,6 +707,17 @@ public final class OpenWireMessageConverter { return amqMsg; } + private static T getObjectProperty(ICoreMessage message, Class type, SimpleString property) { + if (message.getPropertyNames().contains(property)) { + try { + return type.cast(message.getObjectProperty(property)); + } catch (ClassCastException e) { + ActiveMQServerLogger.LOGGER.failedToDealWithObjectProperty(property, e.getMessage()); + } + } + return null; + } + private static byte[] toAMQMessageTextType(final ActiveMQBuffer buffer, final boolean isCompressed) throws IOException { byte[] bytes = null; @@ -946,7 +960,7 @@ public final class OpenWireMessageConverter { private static void setAMQMsgHdrIngressTimestamp(final ActiveMQMessage amqMsg, final Long ingressTimestamp) throws IOException { try { - amqMsg.setLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP.toString(), ingressTimestamp); + amqMsg.setLongProperty(HDR_INGRESS_TIMESTAMP.toString(), ingressTimestamp); } catch (JMSException e) { throw new IOException("failure to set ingress timestamp property " + ingressTimestamp, e); } @@ -973,7 +987,7 @@ public final class OpenWireMessageConverter { amqMsg.setObjectProperty(keyStr, prop); } } catch (JMSException e) { - throw new IOException("exception setting property " + s + " : " + prop, e); + ActiveMQServerLogger.LOGGER.failedToDealWithObjectProperty(s, e.getMessage()); } } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java index e1ecfd4790..6231eefe31 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java +++ b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java @@ -36,6 +36,7 @@ import org.junit.Test; import org.mockito.Mockito; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class OpenWireMessageConverterTest { @@ -108,4 +109,29 @@ public class OpenWireMessageConverterTest { assertTrue(messageDispatch.getMessage().getProperty(bytesPropertyKey) instanceof String); } + + @Test + public void testBadPropertyConversion() throws Exception { + final String hdrArrival = "__HDR_ARRIVAL"; + final String hdrBrokerInTime = "__HDR_BROKER_IN_TIME"; + final String hdrCommandId = "__HDR_COMMAND_ID"; + final String hdrDroppable = "__HDR_DROPPABLE"; + + ICoreMessage coreMessage = new CoreMessage().initBuffer(8); + coreMessage.putStringProperty(hdrArrival, "1234"); + coreMessage.putStringProperty(hdrBrokerInTime, "5678"); + coreMessage.putStringProperty(hdrCommandId, "foo"); + coreMessage.putStringProperty(hdrDroppable, "true"); + + MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class)); + AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); + Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); + + MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID); + + assertNull(messageDispatch.getMessage().getProperty(hdrArrival)); + assertNull(messageDispatch.getMessage().getProperty(hdrBrokerInTime)); + assertNull(messageDispatch.getMessage().getProperty(hdrCommandId)); + assertNull(messageDispatch.getMessage().getProperty(hdrDroppable)); + } } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index bb72f57ebd..a056c84550 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1751,6 +1751,10 @@ public interface ActiveMQServerLogger extends BasicLogger { format = Message.Format.MESSAGE_FORMAT) void duplicateAddressSettingMatch(String match); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222302, value = "Failed to deal with property {0} when converting message from core to OpenWire: {1}", format = Message.Format.MESSAGE_FORMAT) + void failedToDealWithObjectProperty(SimpleString property, String exceptionMessage); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) void initializationError(@Cause Throwable e);