Fix for AMQ-4563: Changes the KahaDB store to use a more consistent key for message ids. MessageId.toString can change depending on how the message was encoded.

Support storing the externally generated message id of a message in the MessageID class so that Selectors can operate against that external message id.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1488353 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-05-31 18:53:48 +00:00
parent 478d982440
commit b2b4fc81f7
6 changed files with 41 additions and 17 deletions

View File

@ -512,9 +512,10 @@ class AmqpProtocolConverter {
message.setJMSDestination(destination);
}
message.setProducerId(producerId);
if (message.getMessageId() == null) {
message.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId()));
}
MessageId messageId = message.getMessageId();
messageId.setProducerId(producerId);
messageId.setProducerSequenceId(messageIdGenerator.getNextSequenceId());
if (LOG.isTraceEnabled()) {
LOG.trace("Inbound Message:{} from Producer:{}", message.getMessageId(), producerId);

View File

@ -141,7 +141,7 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess
// so lets set the IDs to be 1
MessageId id = new MessageId();
id.setTextView(value);
this.setMessageId(messageId);
this.setMessageId(id);
}
} else {
this.setMessageId(null);

View File

@ -26,6 +26,7 @@ public class MessageId implements DataStructure, Comparable<MessageId> {
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ID;
protected String textView;
protected ProducerId producerId;
protected long producerSequenceId;
protected long brokerSequenceId;
@ -69,6 +70,8 @@ public class MessageId implements DataStructure, Comparable<MessageId> {
if (p >= 0) {
producerSequenceId = Long.parseLong(messageKey.substring(p + 1));
messageKey = messageKey.substring(0, p);
} else {
throw new NumberFormatException();
}
producerId = new ProducerId(messageKey);
}
@ -79,9 +82,17 @@ public class MessageId implements DataStructure, Comparable<MessageId> {
* accommodate foreign JMS message IDs
*/
public void setTextView(String key) {
this.textView = key;
this.key = key;
}
/**
* @return
*/
public String getTextView() {
return textView;
}
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
@ -105,9 +116,21 @@ public class MessageId implements DataStructure, Comparable<MessageId> {
return hashCode;
}
public String toProducerKey() {
if( textView==null ) {
return toString();
} else {
return producerId.toString() + ":" + producerSequenceId;
}
}
public String toString() {
if (key == null) {
key = producerId.toString() + ":" + producerSequenceId;
if( textView!=null ) {
key = textView;
} else {
key = producerId.toString() + ":" + producerSequenceId;
}
}
return key;
}

View File

@ -423,7 +423,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public void addMessage(ConnectionContext context, Message message) throws IOException {
KahaAddMessageCommand command = new KahaAddMessageCommand();
command.setDestination(dest);
command.setMessageId(message.getMessageId().toString());
command.setMessageId(message.getMessageId().toProducerKey());
command.setTransactionInfo(transactionIdTransformer.transform(message.getTransactionId()));
command.setPriority(message.getPriority());
command.setPrioritySupported(isPrioritizedMessages());
@ -436,7 +436,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
command.setMessageId(ack.getLastMessageId().toString());
command.setMessageId(ack.getLastMessageId().toProducerKey());
command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
@ -451,7 +451,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
public Message getMessage(MessageId identity) throws IOException {
final String key = identity.toString();
final String key = identity.toProducerKey();
// Hopefully one day the page file supports concurrent read
// operations... but for now we must
@ -590,7 +590,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override
public void setBatch(MessageId identity) throws IOException {
try {
final String key = identity.toString();
final String key = identity.toProducerKey();
lockAsyncJobQueue();
// Hopefully one day the page file supports concurrent read
@ -707,7 +707,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey);
command.setMessageId(messageId.toString());
command.setMessageId(messageId.toProducerKey());
command.setTransactionInfo(ack != null ? transactionIdTransformer.transform(ack.getTransactionId()) : null);
if (ack != null && ack.isUnmatchedAck()) {
command.setAck(UNMATCHED);

View File

@ -2268,7 +2268,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.indexLock.writeLock().lock();
try {
for (MessageAck ack : acks) {
ackedAndPrepared.add(ack.getLastMessageId().toString());
ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
}
} finally {
this.indexLock.writeLock().unlock();
@ -2280,7 +2280,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.indexLock.writeLock().lock();
try {
for (MessageAck ack : acks) {
ackedAndPrepared.remove(ack.getLastMessageId().toString());
ackedAndPrepared.remove(ack.getLastMessageId().toProducerKey());
}
} finally {
this.indexLock.writeLock().unlock();

View File

@ -135,14 +135,14 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
public void addMessage(ConnectionContext context, Message message) throws IOException {
KahaAddMessageCommand command = new KahaAddMessageCommand();
command.setDestination(dest);
command.setMessageId(message.getMessageId().toString());
command.setMessageId(message.getMessageId().toProducerKey());
processAdd(command, message.getTransactionId(), wireFormat.marshal(message));
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
command.setMessageId(ack.getLastMessageId().toString());
command.setMessageId(ack.getLastMessageId().toProducerKey());
processRemove(command, ack.getTransactionId());
}
@ -153,7 +153,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
public Message getMessage(MessageId identity) throws IOException {
final String key = identity.toString();
final String key = identity.toProducerKey();
// Hopefully one day the page file supports concurrent read operations... but for now we must
// externally synchronize...
@ -241,7 +241,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
@Override
public void setBatch(MessageId identity) throws IOException {
final String key = identity.toString();
final String key = identity.toProducerKey();
// Hopefully one day the page file supports concurrent read operations... but for now we must
// externally synchronize...
@ -282,7 +282,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
command.setMessageId(messageId.toString());
command.setMessageId(messageId.toProducerKey());
// We are not passed a transaction info.. so we can't participate in a transaction.
// Looks like a design issue with the TopicMessageStore interface. Also we can't recover the original ack
// to pass back to the XA recover method.