ARTEMIS-3457 log WARN for OpenWire property conversion problem

While converting a core message to an OpenWire message there may be an
error processing a property value. Currently this results in an
exception and the message is not dispatched to the client. The broker
eventually attempts to redeliver this message resulting in the same
error. Instead of throwing an exception the broker should simply log a
WARN message and skip the property. This will allow clients to receive
the message without the problematic property and the broker will not
have to attempt to redeliver the message again.
This commit is contained in:
Justin Bertram 2021-09-02 11:23:12 -05:00 committed by clebertsuconic
parent a4be85369d
commit 13df6a8fb9
3 changed files with 71 additions and 27 deletions

View File

@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.DataConstants;
@ -75,6 +76,8 @@ import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.UTF8Buffer; import org.fusesource.hawtbuf.UTF8Buffer;
import static org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP;
public final class OpenWireMessageConverter { public final class OpenWireMessageConverter {
private static final SimpleString JMS_TYPE_PROPERTY = new SimpleString("JMSType"); private static final SimpleString JMS_TYPE_PROPERTY = new SimpleString("JMSType");
@ -529,7 +532,7 @@ public final class OpenWireMessageConverter {
AMQConsumer consumer, UUID serverNodeUUID) throws IOException { AMQConsumer consumer, UUID serverNodeUUID) throws IOException {
final ActiveMQMessage amqMsg; final ActiveMQMessage amqMsg;
final byte coreType = coreMessage.getType(); final byte coreType = coreMessage.getType();
final Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED); final Boolean compressProp = getObjectProperty(coreMessage, Boolean.class, AMQ_MSG_COMPRESSED);
final boolean isCompressed = compressProp != null && compressProp; final boolean isCompressed = compressProp != null && compressProp;
final byte[] bytes; final byte[] bytes;
final ActiveMQBuffer buffer = coreMessage.getDataBuffer(); final ActiveMQBuffer buffer = coreMessage.getDataBuffer();
@ -564,7 +567,7 @@ public final class OpenWireMessageConverter {
throw new IllegalStateException("Unknown message type: " + coreMessage.getType()); throw new IllegalStateException("Unknown message type: " + coreMessage.getType());
} }
final String type = coreMessage.getStringProperty(JMS_TYPE_PROPERTY); final String type = getObjectProperty(coreMessage, String.class, JMS_TYPE_PROPERTY);
if (type != null) { if (type != null) {
amqMsg.setJMSType(type); amqMsg.setJMSType(type);
} }
@ -573,7 +576,7 @@ public final class OpenWireMessageConverter {
amqMsg.setPriority(coreMessage.getPriority()); amqMsg.setPriority(coreMessage.getPriority());
amqMsg.setTimestamp(coreMessage.getTimestamp()); amqMsg.setTimestamp(coreMessage.getTimestamp());
Long brokerInTime = (Long) coreMessage.getObjectProperty(AMQ_MSG_BROKER_IN_TIME); Long brokerInTime = getObjectProperty(coreMessage, Long.class, AMQ_MSG_BROKER_IN_TIME);
if (brokerInTime == null) { if (brokerInTime == null) {
brokerInTime = 0L; brokerInTime = 0L;
} }
@ -583,35 +586,35 @@ public final class OpenWireMessageConverter {
//we need check null because messages may come from other clients //we need check null because messages may come from other clients
//and those amq specific attribute may not be set. //and those amq specific attribute may not be set.
Long arrival = (Long) coreMessage.getObjectProperty(AMQ_MSG_ARRIVAL); Long arrival = getObjectProperty(coreMessage, Long.class, AMQ_MSG_ARRIVAL);
if (arrival == null) { if (arrival == null) {
//messages from other sources (like core client) may not set this prop //messages from other sources (like core client) may not set this prop
arrival = 0L; arrival = 0L;
} }
amqMsg.setArrival(arrival); amqMsg.setArrival(arrival);
final Object brokerPath = coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH); final SimpleString brokerPath = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_BROKER_PATH);
if (brokerPath instanceof SimpleString && ((SimpleString)brokerPath).length() > 0) { if (brokerPath != null && brokerPath.length() > 0) {
setAMQMsgBrokerPath(amqMsg, ((SimpleString)brokerPath).toString()); setAMQMsgBrokerPath(amqMsg, brokerPath.toString());
} }
final Object clusterPath = coreMessage.getObjectProperty(AMQ_MSG_CLUSTER); final SimpleString clusterPath = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_CLUSTER);
if (clusterPath instanceof SimpleString && ((SimpleString)clusterPath).length() > 0) { if (clusterPath != null && clusterPath.length() > 0) {
setAMQMsgClusterPath(amqMsg, ((SimpleString)clusterPath).toString()); setAMQMsgClusterPath(amqMsg, clusterPath.toString());
} }
Integer commandId = (Integer) coreMessage.getObjectProperty(AMQ_MSG_COMMAND_ID); Integer commandId = getObjectProperty(coreMessage, Integer.class, AMQ_MSG_COMMAND_ID);
if (commandId == null) { if (commandId == null) {
commandId = -1; commandId = -1;
} }
amqMsg.setCommandId(commandId); amqMsg.setCommandId(commandId);
final SimpleString corrId = (SimpleString) coreMessage.getObjectProperty(JMS_CORRELATION_ID_PROPERTY); final SimpleString corrId = getObjectProperty(coreMessage, SimpleString.class, JMS_CORRELATION_ID_PROPERTY);
if (corrId != null) { if (corrId != null) {
amqMsg.setCorrelationId(corrId.toString()); amqMsg.setCorrelationId(corrId.toString());
} }
final byte[] dsBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_DATASTRUCTURE); final byte[] dsBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_DATASTRUCTURE);
if (dsBytes != null) { if (dsBytes != null) {
setAMQMsgDataStructure(amqMsg, marshaller, dsBytes); setAMQMsgDataStructure(amqMsg, marshaller, dsBytes);
} }
@ -626,7 +629,7 @@ public final class OpenWireMessageConverter {
amqMsg.setGroupSequence(coreMessage.getGroupSequence()); amqMsg.setGroupSequence(coreMessage.getGroupSequence());
final byte[] midBytes = coreMessage.getBytesProperty(AMQ_MSG_MESSAGE_ID); final byte[] midBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_MESSAGE_ID);
final MessageId mid; final MessageId mid;
if (midBytes != null) { if (midBytes != null) {
ByteSequence midSeq = new ByteSequence(midBytes); ByteSequence midSeq = new ByteSequence(midBytes);
@ -639,45 +642,45 @@ public final class OpenWireMessageConverter {
amqMsg.setMessageId(mid); amqMsg.setMessageId(mid);
final byte[] origDestBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_DESTINATION); final byte[] origDestBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_ORIG_DESTINATION);
if (origDestBytes != null) { if (origDestBytes != null) {
setAMQMsgOriginalDestination(amqMsg, marshaller, origDestBytes); setAMQMsgOriginalDestination(amqMsg, marshaller, origDestBytes);
} }
final byte[] origTxIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_TXID); final byte[] origTxIdBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_ORIG_TXID);
if (origTxIdBytes != null) { if (origTxIdBytes != null) {
setAMQMsgOriginalTransactionId(amqMsg, marshaller, origTxIdBytes); setAMQMsgOriginalTransactionId(amqMsg, marshaller, origTxIdBytes);
} }
final byte[] producerIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_PRODUCER_ID); final byte[] producerIdBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_PRODUCER_ID);
if (producerIdBytes != null) { if (producerIdBytes != null) {
ProducerId producerId = (ProducerId) marshaller.unmarshal(new ByteSequence(producerIdBytes)); ProducerId producerId = (ProducerId) marshaller.unmarshal(new ByteSequence(producerIdBytes));
amqMsg.setProducerId(producerId); amqMsg.setProducerId(producerId);
} }
final byte[] marshalledBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MARSHALL_PROP); final byte[] marshalledBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_MARSHALL_PROP);
if (marshalledBytes != null) { if (marshalledBytes != null) {
amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes)); amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes));
} }
amqMsg.setRedeliveryCounter(reference.getDeliveryCount() - 1); amqMsg.setRedeliveryCounter(reference.getDeliveryCount() - 1);
final byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO); final byte[] replyToBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_REPLY_TO);
if (replyToBytes != null) { if (replyToBytes != null) {
setAMQMsgReplyTo(amqMsg, marshaller, replyToBytes); setAMQMsgReplyTo(amqMsg, marshaller, replyToBytes);
} }
final Object userId = coreMessage.getObjectProperty(AMQ_MSG_USER_ID); final SimpleString userId = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_USER_ID);
if (userId instanceof SimpleString && ((SimpleString)userId).length() > 0) { if (userId != null && userId.length() > 0) {
amqMsg.setUserID(((SimpleString)userId).toString()); amqMsg.setUserID(userId.toString());
} }
final Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE); final Boolean isDroppable = getObjectProperty(coreMessage, Boolean.class, AMQ_MSG_DROPPABLE);
if (isDroppable != null) { if (isDroppable != null) {
amqMsg.setDroppable(isDroppable); amqMsg.setDroppable(isDroppable);
} }
final SimpleString dlqCause = (SimpleString) coreMessage.getObjectProperty(AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); final SimpleString dlqCause = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
if (dlqCause != null) { if (dlqCause != null) {
setAMQMsgDlqDeliveryFailureCause(amqMsg, dlqCause); setAMQMsgDlqDeliveryFailureCause(amqMsg, dlqCause);
} }
@ -687,7 +690,7 @@ public final class OpenWireMessageConverter {
setAMQMsgHdrLastValueName(amqMsg, lastValueProperty); setAMQMsgHdrLastValueName(amqMsg, lastValueProperty);
} }
final Long ingressTimestamp = coreMessage.getPropertyNames().contains(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP) ? coreMessage.getLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP) : null; final Long ingressTimestamp = getObjectProperty(coreMessage, Long.class, HDR_INGRESS_TIMESTAMP);
if (ingressTimestamp != null) { if (ingressTimestamp != null) {
setAMQMsgHdrIngressTimestamp(amqMsg, ingressTimestamp); setAMQMsgHdrIngressTimestamp(amqMsg, ingressTimestamp);
} }
@ -704,6 +707,17 @@ public final class OpenWireMessageConverter {
return amqMsg; return amqMsg;
} }
private static <T> T getObjectProperty(ICoreMessage message, Class<T> type, SimpleString property) {
if (message.getPropertyNames().contains(property)) {
try {
return type.cast(message.getObjectProperty(property));
} catch (ClassCastException e) {
ActiveMQServerLogger.LOGGER.failedToDealWithObjectProperty(property, e.getMessage());
}
}
return null;
}
private static byte[] toAMQMessageTextType(final ActiveMQBuffer buffer, private static byte[] toAMQMessageTextType(final ActiveMQBuffer buffer,
final boolean isCompressed) throws IOException { final boolean isCompressed) throws IOException {
byte[] bytes = null; byte[] bytes = null;
@ -946,7 +960,7 @@ public final class OpenWireMessageConverter {
private static void setAMQMsgHdrIngressTimestamp(final ActiveMQMessage amqMsg, private static void setAMQMsgHdrIngressTimestamp(final ActiveMQMessage amqMsg,
final Long ingressTimestamp) throws IOException { final Long ingressTimestamp) throws IOException {
try { try {
amqMsg.setLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP.toString(), ingressTimestamp); amqMsg.setLongProperty(HDR_INGRESS_TIMESTAMP.toString(), ingressTimestamp);
} catch (JMSException e) { } catch (JMSException e) {
throw new IOException("failure to set ingress timestamp property " + ingressTimestamp, e); throw new IOException("failure to set ingress timestamp property " + ingressTimestamp, e);
} }
@ -973,7 +987,7 @@ public final class OpenWireMessageConverter {
amqMsg.setObjectProperty(keyStr, prop); amqMsg.setObjectProperty(keyStr, prop);
} }
} catch (JMSException e) { } catch (JMSException e) {
throw new IOException("exception setting property " + s + " : " + prop, e); ActiveMQServerLogger.LOGGER.failedToDealWithObjectProperty(s, e.getMessage());
} }
} }
} }

View File

@ -36,6 +36,7 @@ import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
public class OpenWireMessageConverterTest { public class OpenWireMessageConverterTest {
@ -108,4 +109,29 @@ public class OpenWireMessageConverterTest {
assertTrue(messageDispatch.getMessage().getProperty(bytesPropertyKey) instanceof String); assertTrue(messageDispatch.getMessage().getProperty(bytesPropertyKey) instanceof String);
} }
@Test
public void testBadPropertyConversion() throws Exception {
final String hdrArrival = "__HDR_ARRIVAL";
final String hdrBrokerInTime = "__HDR_BROKER_IN_TIME";
final String hdrCommandId = "__HDR_COMMAND_ID";
final String hdrDroppable = "__HDR_DROPPABLE";
ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
coreMessage.putStringProperty(hdrArrival, "1234");
coreMessage.putStringProperty(hdrBrokerInTime, "5678");
coreMessage.putStringProperty(hdrCommandId, "foo");
coreMessage.putStringProperty(hdrDroppable, "true");
MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID);
assertNull(messageDispatch.getMessage().getProperty(hdrArrival));
assertNull(messageDispatch.getMessage().getProperty(hdrBrokerInTime));
assertNull(messageDispatch.getMessage().getProperty(hdrCommandId));
assertNull(messageDispatch.getMessage().getProperty(hdrDroppable));
}
} }

View File

@ -1751,6 +1751,10 @@ public interface ActiveMQServerLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT) format = Message.Format.MESSAGE_FORMAT)
void duplicateAddressSettingMatch(String match); void duplicateAddressSettingMatch(String match);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222302, value = "Failed to deal with property {0} when converting message from core to OpenWire: {1}", format = Message.Format.MESSAGE_FORMAT)
void failedToDealWithObjectProperty(SimpleString property, String exceptionMessage);
@LogMessage(level = Logger.Level.ERROR) @LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e); void initializationError(@Cause Throwable e);