Fix for client's that use a message Id similar to ActiveMQ's version
which can throw off the ack later on when a stored message is
dispatched.
This commit is contained in:
Timothy Bish 2014-03-27 11:32:57 -04:00
parent 6d8449f053
commit da07a11760
1 changed files with 16 additions and 10 deletions

View File

@ -561,14 +561,20 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
message.setProducerId(producerId);
MessageId messageId = message.getMessageId();
if (messageId == null) {
messageId = new MessageId();
message.setMessageId(messageId);
// Always override the AMQP client's MessageId with our own. Preserve the
// original in the TextView property for later Ack.
MessageId messageId = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
MessageId amqpMessageId = message.getMessageId();
if (amqpMessageId != null) {
if (amqpMessageId.getTextView() != null) {
messageId.setTextView(amqpMessageId.getTextView());
} else {
messageId.setTextView(amqpMessageId.toString());
}
}
messageId.setProducerId(producerId);
messageId.setProducerSequenceId(messageIdGenerator.getNextSequenceId());
message.setMessageId(messageId);
LOG.trace("Inbound Message:{} from Producer:{}", message.getMessageId(), producerId + ":" + messageId.getProducerSequenceId());
@ -580,12 +586,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
// Lets handle the case where the expiration was set, but the timestamp
// was not set by the client. Lets assign the timestamp now, and adjust the
// was not set by the client. Lets assign the timestamp now, and adjust the
// expiration.
if( message.getExpiration()!= 0 ) {
if( message.getTimestamp()==0 ) {
if (message.getExpiration() != 0) {
if (message.getTimestamp() == 0) {
message.setTimestamp(System.currentTimeMillis());
message.setExpiration(message.getTimestamp()+message.getExpiration());
message.setExpiration(message.getTimestamp() + message.getExpiration());
}
}