ARTEMIS-3264 handle core-to-AMQP conversion failures more gracefully

If an AMQP consumer tries to receive a message and the broker is unable
to convert the message from core to AMQP then the consumer is
disconnected and the offending message stays in the queue. When the
consumer reconnects the conversion error will happen again resulting in
a loop that can only be resolved through administrative action (e.g.
deleting the message manually or sending it to a dead letter address).

This commit fixes that problem by detecting the conversion problem and
sending the message to the queue's dead letter address. It also doesn't
disconnect the consumer.

This commit also changes the log messages associated with sending a
message to the dead letter address since this event can now occur
regardless of the delivery attempts.
This commit is contained in:
Justin Bertram 2022-09-19 11:33:00 -05:00 committed by clebertsuconic
parent 5d8e7fe829
commit cd7555c523
6 changed files with 285 additions and 208 deletions

View File

@ -88,232 +88,239 @@ public class CoreAmqpConverter {
if (coreMessage == null) {
return null;
}
if (coreMessage.isServerMessage() && coreMessage.isLargeMessage() && coreMessage.getType() == EMBEDDED_TYPE) {
//large AMQP messages received across cluster nodes
final Message message = EmbedMessageUtil.extractEmbedded(coreMessage, storageManager);
if (message instanceof AMQPMessage) {
return (AMQPMessage) message;
try {
if (coreMessage.isServerMessage() && coreMessage.isLargeMessage() && coreMessage.getType() == EMBEDDED_TYPE) {
//large AMQP messages received across cluster nodes
final Message message = EmbedMessageUtil.extractEmbedded(coreMessage, storageManager);
if (message instanceof AMQPMessage) {
return (AMQPMessage) message;
}
}
}
CoreMessageWrapper message = CoreMessageWrapper.wrap(coreMessage);
message.decode();
CoreMessageWrapper message = CoreMessageWrapper.wrap(coreMessage);
message.decode();
Header header = null;
final Properties properties = new Properties();
Map<Symbol, Object> daMap = null;
final Map<Symbol, Object> maMap = new HashMap<>();
Map<String, Object> apMap = null;
Map<Symbol, Object> footerMap = null;
Header header = null;
final Properties properties = new Properties();
Map<Symbol, Object> daMap = null;
final Map<Symbol, Object> maMap = new HashMap<>();
Map<String, Object> apMap = null;
Map<Symbol, Object> footerMap = null;
Section body = message.createAMQPSection(maMap, properties);
Section body = message.createAMQPSection(maMap, properties);
if (message.getInnerMessage().isDurable()) {
if (header == null) {
header = new Header();
if (message.getInnerMessage().isDurable()) {
if (header == null) {
header = new Header();
}
header.setDurable(true);
}
header.setDurable(true);
}
byte priority = (byte) message.getJMSPriority();
if (priority != MESSAGE_DEFAULT_PRIORITY) {
if (header == null) {
header = new Header();
byte priority = (byte) message.getJMSPriority();
if (priority != MESSAGE_DEFAULT_PRIORITY) {
if (header == null) {
header = new Header();
}
header.setPriority(UnsignedByte.valueOf(priority));
}
header.setPriority(UnsignedByte.valueOf(priority));
}
String type = message.getJMSType();
if (type != null) {
properties.setSubject(type);
}
String messageId = message.getJMSMessageID();
if (messageId != null) {
try {
properties.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(messageId));
} catch (ActiveMQAMQPIllegalStateException e) {
properties.setMessageId(messageId);
String type = message.getJMSType();
if (type != null) {
properties.setSubject(type);
}
} else {
if (message.getInnerMessage().getUserID() != null) {
properties.setMessageId("ID:" + message.getInnerMessage().getUserID().toString());
String messageId = message.getJMSMessageID();
if (messageId != null) {
try {
properties.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(messageId));
} catch (ActiveMQAMQPIllegalStateException e) {
properties.setMessageId(messageId);
}
} else {
if (message.getInnerMessage().getUserID() != null) {
properties.setMessageId("ID:" + message.getInnerMessage().getUserID().toString());
}
}
SimpleString destination = message.getDestination();
if (destination != null) {
properties.setTo(toAddress(destination.toString()));
maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, AMQPMessageSupport.destinationType(destination.toString()));
}
SimpleString replyTo = message.getJMSReplyTo();
if (replyTo != null) {
properties.setReplyTo(toAddress(replyTo.toString()));
maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, AMQPMessageSupport.destinationType(replyTo.toString()));
}
}
SimpleString destination = message.getDestination();
if (destination != null) {
properties.setTo(toAddress(destination.toString()));
maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, AMQPMessageSupport.destinationType(destination.toString()));
}
SimpleString replyTo = message.getJMSReplyTo();
if (replyTo != null) {
properties.setReplyTo(toAddress(replyTo.toString()));
maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, AMQPMessageSupport.destinationType(replyTo.toString()));
}
long scheduledDelivery = coreMessage.getScheduledDeliveryTime();
long scheduledDelivery = coreMessage.getScheduledDeliveryTime();
if (scheduledDelivery > 0) {
maMap.put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledDelivery);
}
if (scheduledDelivery > 0) {
maMap.put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledDelivery);
}
Object correlationID = message.getInnerMessage().getCorrelationID();
if (correlationID instanceof String || correlationID instanceof SimpleString) {
String c = correlationID instanceof String ? ((String) correlationID) : ((SimpleString) correlationID).toString();
try {
properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(c));
} catch (ActiveMQAMQPIllegalStateException e) {
Object correlationID = message.getInnerMessage().getCorrelationID();
if (correlationID instanceof String || correlationID instanceof SimpleString) {
String c = correlationID instanceof String ? ((String) correlationID) : ((SimpleString) correlationID).toString();
try {
properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(c));
} catch (ActiveMQAMQPIllegalStateException e) {
properties.setCorrelationId(correlationID);
}
} else {
properties.setCorrelationId(correlationID);
}
} else {
properties.setCorrelationId(correlationID);
}
long expiration = message.getExpiration();
if (expiration != 0) {
long ttl = expiration - System.currentTimeMillis();
if (ttl < 0) {
ttl = 1;
long expiration = message.getExpiration();
if (expiration != 0) {
long ttl = expiration - System.currentTimeMillis();
if (ttl < 0) {
ttl = 1;
}
if (header == null) {
header = new Header();
}
header.setTtl(new UnsignedInteger((int) ttl));
properties.setAbsoluteExpiryTime(new Date(expiration));
}
long timeStamp = message.getJMSTimestamp();
if (timeStamp != 0) {
properties.setCreationTime(new Date(timeStamp));
}
if (header == null) {
header = new Header();
}
header.setTtl(new UnsignedInteger((int) ttl));
properties.setAbsoluteExpiryTime(new Date(expiration));
}
long timeStamp = message.getJMSTimestamp();
if (timeStamp != 0) {
properties.setCreationTime(new Date(timeStamp));
}
final Set<String> keySet = MessageUtil.getPropertyNames(message.getInnerMessage());
for (String key : keySet) {
if (key.startsWith("JMSX")) {
if (key.equals("JMSXUserID")) {
String value = message.getStringProperty(key);
if (value != null) {
properties.setUserId(Binary.create(StandardCharsets.UTF_8.encode(value)));
final Set<String> keySet = MessageUtil.getPropertyNames(message.getInnerMessage());
for (String key : keySet) {
if (key.startsWith("JMSX")) {
if (key.equals("JMSXUserID")) {
String value = message.getStringProperty(key);
if (value != null) {
properties.setUserId(Binary.create(StandardCharsets.UTF_8.encode(value)));
}
continue;
} else if (key.equals("JMSXGroupID")) {
String value = message.getStringProperty(key);
properties.setGroupId(value);
continue;
} else if (key.equals("JMSXGroupSeq")) {
int value = message.getIntProperty(key);
properties.setGroupSequence(UnsignedInteger.valueOf(value));
continue;
}
continue;
} else if (key.equals("JMSXGroupID")) {
} else if (key.startsWith(JMS_AMQP_PREFIX)) {
// AMQP Message Information stored from a conversion to the Core Message
if (key.equals(JMS_AMQP_NATIVE)) {
// skip..internal use only
continue;
} else if (key.equals(JMS_AMQP_FIRST_ACQUIRER)) {
if (header == null) {
header = new Header();
}
header.setFirstAcquirer(message.getBooleanProperty(key));
continue;
} else if (key.equals(JMS_AMQP_HEADER)) {
if (header == null) {
header = new Header();
}
continue;
} else if (key.equals(JMS_AMQP_HEADER_DURABLE)) {
if (header == null) {
header = new Header();
}
header.setDurable(message.getInnerMessage().isDurable());
continue;
} else if (key.equals(JMS_AMQP_HEADER_PRIORITY)) {
if (header == null) {
header = new Header();
}
header.setPriority(UnsignedByte.valueOf(priority));
continue;
} else if (key.startsWith(JMS_AMQP_PROPERTIES)) {
continue;
} else if (key.startsWith(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX)) {
if (daMap == null) {
daMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length());
daMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
continue;
} else if (key.startsWith(JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX)) {
if (daMap == null) {
daMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX.length());
daMap.put(Symbol.valueOf(name), decodeEmbeddedAMQPType(message.getObjectProperty(key)));
continue;
} else if (key.startsWith(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX)) {
String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length());
maMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
continue;
} else if (key.startsWith(JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX)) {
String name = key.substring(JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX.length());
maMap.put(Symbol.valueOf(name), decodeEmbeddedAMQPType(message.getObjectProperty(key)));
continue;
} else if (key.equals(JMS_AMQP_CONTENT_TYPE)) {
properties.setContentType(Symbol.getSymbol(message.getStringProperty(key)));
continue;
} else if (key.equals(JMS_AMQP_CONTENT_ENCODING)) {
properties.setContentEncoding(Symbol.getSymbol(message.getStringProperty(key)));
continue;
} else if (key.equals(JMS_AMQP_REPLYTO_GROUP_ID)) {
properties.setReplyToGroupId(message.getStringProperty(key));
continue;
} else if (key.equals(JMS_AMQP_ORIGINAL_ENCODING)) {
// skip..remove annotation from previous inbound transformation
continue;
} else if (key.startsWith(JMS_AMQP_ENCODED_FOOTER_PREFIX)) {
if (footerMap == null) {
footerMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_ENCODED_FOOTER_PREFIX.length());
footerMap.put(Symbol.valueOf(name), decodeEmbeddedAMQPType(message.getObjectProperty(key)));
continue;
} else if (key.startsWith(JMS_AMQP_FOOTER_PREFIX)) {
if (footerMap == null) {
footerMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
footerMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
continue;
}
} else if (key.equals(Message.HDR_GROUP_ID.toString())) {
String value = message.getStringProperty(key);
properties.setGroupId(value);
continue;
} else if (key.equals("JMSXGroupSeq")) {
} else if (key.equals(Message.HDR_GROUP_SEQUENCE.toString())) {
int value = message.getIntProperty(key);
properties.setGroupSequence(UnsignedInteger.valueOf(value));
continue;
}
} else if (key.startsWith(JMS_AMQP_PREFIX)) {
// AMQP Message Information stored from a conversion to the Core Message
if (key.equals(JMS_AMQP_NATIVE)) {
} else if (key.equals(NATIVE_MESSAGE_ID)) {
// skip..internal use only
continue;
} else if (key.equals(JMS_AMQP_FIRST_ACQUIRER)) {
if (header == null) {
header = new Header();
}
header.setFirstAcquirer(message.getBooleanProperty(key));
continue;
} else if (key.equals(JMS_AMQP_HEADER)) {
if (header == null) {
header = new Header();
}
continue;
} else if (key.equals(JMS_AMQP_HEADER_DURABLE)) {
if (header == null) {
header = new Header();
}
header.setDurable(message.getInnerMessage().isDurable());
continue;
} else if (key.equals(JMS_AMQP_HEADER_PRIORITY)) {
if (header == null) {
header = new Header();
}
header.setPriority(UnsignedByte.valueOf(priority));
continue;
} else if (key.startsWith(JMS_AMQP_PROPERTIES)) {
continue;
} else if (key.startsWith(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX)) {
if (daMap == null) {
daMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length());
daMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
continue;
} else if (key.startsWith(JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX)) {
if (daMap == null) {
daMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX.length());
daMap.put(Symbol.valueOf(name), decodeEmbeddedAMQPType(message.getObjectProperty(key)));
continue;
} else if (key.startsWith(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX)) {
String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length());
maMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
continue;
} else if (key.startsWith(JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX)) {
String name = key.substring(JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX.length());
maMap.put(Symbol.valueOf(name), decodeEmbeddedAMQPType(message.getObjectProperty(key)));
continue;
} else if (key.equals(JMS_AMQP_CONTENT_TYPE)) {
properties.setContentType(Symbol.getSymbol(message.getStringProperty(key)));
continue;
} else if (key.equals(JMS_AMQP_CONTENT_ENCODING)) {
properties.setContentEncoding(Symbol.getSymbol(message.getStringProperty(key)));
continue;
} else if (key.equals(JMS_AMQP_REPLYTO_GROUP_ID)) {
properties.setReplyToGroupId(message.getStringProperty(key));
continue;
} else if (key.equals(JMS_AMQP_ORIGINAL_ENCODING)) {
} else if (key.endsWith(HDR_SCHEDULED_DELIVERY_TIME.toString())) {
// skip..remove annotation from previous inbound transformation
continue;
} else if (key.startsWith(JMS_AMQP_ENCODED_FOOTER_PREFIX)) {
if (footerMap == null) {
footerMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_ENCODED_FOOTER_PREFIX.length());
footerMap.put(Symbol.valueOf(name), decodeEmbeddedAMQPType(message.getObjectProperty(key)));
continue;
} else if (key.startsWith(JMS_AMQP_FOOTER_PREFIX)) {
if (footerMap == null) {
footerMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
footerMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
} else if (key.equals(Message.HDR_INGRESS_TIMESTAMP.toString())) {
maMap.put(AMQPMessageSupport.INGRESS_TIME_MSG_ANNOTATION, message.getLongProperty(key));
continue;
}
} else if (key.equals(Message.HDR_GROUP_ID.toString())) {
String value = message.getStringProperty(key);
properties.setGroupId(value);
continue;
} else if (key.equals(Message.HDR_GROUP_SEQUENCE.toString())) {
int value = message.getIntProperty(key);
properties.setGroupSequence(UnsignedInteger.valueOf(value));
continue;
} else if (key.equals(NATIVE_MESSAGE_ID)) {
// skip..internal use only
continue;
} else if (key.endsWith(HDR_SCHEDULED_DELIVERY_TIME.toString())) {
// skip..remove annotation from previous inbound transformation
continue;
} else if (key.equals(Message.HDR_INGRESS_TIMESTAMP.toString())) {
maMap.put(AMQPMessageSupport.INGRESS_TIME_MSG_ANNOTATION, message.getLongProperty(key));
continue;
if (apMap == null) {
apMap = new HashMap<>();
}
Object objectProperty = message.getObjectProperty(key);
if (objectProperty instanceof byte[]) {
objectProperty = new Binary((byte[]) objectProperty);
}
apMap.put(key, objectProperty);
}
if (apMap == null) {
apMap = new HashMap<>();
}
Object objectProperty = message.getObjectProperty(key);
if (objectProperty instanceof byte[]) {
objectProperty = new Binary((byte[]) objectProperty);
}
apMap.put(key, objectProperty);
long messageID = message.getInnerMessage().getMessageID();
return AMQPStandardMessage.createMessage(messageID, 0, replyTo, header, properties, daMap, maMap, apMap, footerMap, body);
} catch (ConversionException ce) {
throw ce;
} catch (Exception e) {
throw new ConversionException(e.getMessage(), e);
}
long messageID = message.getInnerMessage().getMessageID();
return AMQPStandardMessage.createMessage(messageID, 0, replyTo, header, properties, daMap, maMap, apMap, footerMap, body);
}
private static Object decodeEmbeddedAMQPType(Object payload) {

View File

@ -16,8 +16,10 @@
*/
package org.apache.activemq.artemis.protocol.amqp.logger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.jboss.logging.BasicLogger;
import org.jboss.logging.Logger;
import org.jboss.logging.annotations.Cause;
import org.jboss.logging.annotations.LogMessage;
import org.jboss.logging.annotations.Message;
import org.jboss.logging.annotations.MessageLogger;
@ -71,4 +73,14 @@ public interface ActiveMQAMQPProtocolLogger extends BasicLogger {
@Message(id = 111004, value = "AddressFullPolicy clash on an anonymous producer between destinations {0}(addressFullPolicy={1}) and {2}(addressFullPolicy={3}). This could lead to semantic inconsistencies on your clients. Notice you could have other instances of this scenario however this message will only be logged once. log.debug output would show all instances of this event.",
format = Message.Format.MESSAGE_FORMAT)
void incompatibleAddressFullMessagePolicy(String oldAddress, String oldPolicy, String newAddress, String newPolicy);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 111005, value = "Failed to convert message. Sending it to Dead Letter Address.",
format = Message.Format.MESSAGE_FORMAT)
void messageConversionFailed(@Cause Throwable t);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 111006, value = "Unable to send message {0} to Dead Letter Address.",
format = Message.Format.MESSAGE_FORMAT)
void unableToSendMessageToDLA(MessageReference ref, @Cause Throwable t);
}

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.message.LargeBodyReader;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
@ -47,11 +48,13 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccesso
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter;
import org.apache.activemq.artemis.protocol.amqp.converter.coreWrapper.ConversionException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
@ -545,6 +548,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
} catch (Exception e) {
if (e instanceof ConversionException && brokerConsumer.getBinding() instanceof LocalQueueBinding) {
ActiveMQAMQPProtocolLogger.LOGGER.messageConversionFailed(e);
LocalQueueBinding queueBinding = (LocalQueueBinding) brokerConsumer.getBinding();
try {
queueBinding.getQueue().sendToDeadLetterAddress(null, messageReference);
} catch (Exception e1) {
ActiveMQAMQPProtocolLogger.LOGGER.unableToSendMessageToDLA(messageReference, e1);
}
return;
}
log.warn(e.getMessage(), e);
brokerConsumer.errorProcessing(e, messageReference);
}

View File

@ -1062,19 +1062,19 @@ public interface ActiveMQServerLogger extends BasicLogger {
void errorExpiringReferencesNoAddress(SimpleString name);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222148, value = "Message {0} has exceeded max delivery attempts. No bindings for Dead Letter Address {1} so dropping it",
@Message(id = 222148, value = "Sending message {0} to Dead Letter Address {1}, but it has no bindings so dropping it",
format = Message.Format.MESSAGE_FORMAT)
void messageExceededMaxDelivery(MessageReference ref, SimpleString name);
void noBindingsOnDLA(MessageReference ref, SimpleString name);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222149, value = "Message {0} has reached maximum delivery attempts, sending it to Dead Letter Address {1} from {2}",
@Message(id = 222149, value = "Sending message {0} to Dead Letter Address {1} from {2}",
format = Message.Format.MESSAGE_FORMAT)
void messageExceededMaxDeliverySendtoDLA(MessageReference ref, SimpleString name, SimpleString simpleString);
void sendingMessageToDLA(MessageReference ref, SimpleString name, SimpleString simpleString);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222150, value = "Message {0} has exceeded max delivery attempts. No Dead Letter Address configured for queue {1} so dropping it",
@Message(id = 222150, value = "Sending message {0} to Dead Letter Address, but there is no Dead Letter Address configured for queue {1} so dropping it",
format = Message.Format.MESSAGE_FORMAT)
void messageExceededMaxDeliveryNoDLA(MessageReference ref, SimpleString name);
void sendingMessageToDLAnoDLA(MessageReference ref, SimpleString name);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222151, value = "removing consumer which did not handle a message, consumer={0}, message={1}",

View File

@ -3712,10 +3712,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
Bindings bindingList = postOffice.lookupBindingsForAddress(deadLetterAddress);
if (bindingList == null || bindingList.getBindings().isEmpty()) {
ActiveMQServerLogger.LOGGER.messageExceededMaxDelivery(ref, deadLetterAddress);
ActiveMQServerLogger.LOGGER.noBindingsOnDLA(ref, deadLetterAddress);
ref.acknowledge(tx, AckReason.KILLED, null);
} else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
ActiveMQServerLogger.LOGGER.sendingMessageToDLA(ref, deadLetterAddress, name);
RoutingStatus status = move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null, null, true);
// this shouldn't happen, but in case it does it's better to log a message than just drop the message silently
@ -3725,7 +3725,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return true;
}
} else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name);
ActiveMQServerLogger.LOGGER.sendingMessageToDLAnoDLA(ref, name);
ref.acknowledge(tx, AckReason.KILLED, null);
}

View File

@ -16,21 +16,26 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp.interop;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.jms.client.ActiveMQSession;
import org.apache.activemq.artemis.tests.integration.amqp.JMSClientTestSupport;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsTopic;
@ -82,6 +87,46 @@ public class AmqpCoreTest extends JMSClientTestSupport {
}
}
@Test(timeout = 60000)
public void testAmqpFailedConversionFromCore() throws Exception {
final SimpleString message = RandomUtil.randomSimpleString();
Connection coreJmsConn = this.createCoreConnection();
ConnectionFactory cfAMQP = new JmsConnectionFactory("amqp://127.0.0.1:" + AMQP_PORT);
Connection connectionAMQP = cfAMQP.createConnection();
try {
connectionAMQP.start();
Session sessionAMQP = connectionAMQP.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = sessionAMQP.createQueue(getQueueName());
MessageConsumer consumer = sessionAMQP.createConsumer(destination);
Session session = coreJmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
ClientSession coreSession = ((ActiveMQSession) session).getCoreSession();
ClientProducer producer = coreSession.createProducer(getQueueName());
/*
* Create a message that will intentionally fail conversion.
* The body of a TEXT_TYPE message should be written with writeNullableSimpleString().
*/
ClientMessage m = coreSession.createMessage(true);
m.setType(Message.TEXT_TYPE);
m.getBodyBuffer().writeBytes(message.getData());
producer.send(m);
Wait.assertEquals(1L, () -> server.locateQueue(getDeadLetterAddress()).getMessageCount(), 2000, 100);
m = coreSession.createMessage(true);
m.setType(Message.TEXT_TYPE);
m.getBodyBuffer().writeNullableSimpleString(message);
producer.send(m);
assertNotNull(consumer.receive(500));
} finally {
coreJmsConn.close();
connectionAMQP.close();
}
}
private void sendAmqpMessages(String address, int total) throws Exception {
ConnectionFactory cfAMQP = new JmsConnectionFactory("amqp://127.0.0.1:" + AMQP_PORT);
Connection connectionAMQP = cfAMQP.createConnection();