diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java index b50ab0cdef..4bbbb64e49 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java @@ -14,6 +14,7 @@ package org.apache.activemq.store.kahadaptor; +import org.apache.activemq.command.MessageId; import org.apache.activemq.kaha.StoreEntry; /** @@ -23,6 +24,7 @@ import org.apache.activemq.kaha.StoreEntry; */ public class ConsumerMessageRef{ + private MessageId messageId; private StoreEntry messageEntry; private StoreEntry ackEntry; @@ -54,5 +56,21 @@ public class ConsumerMessageRef{ this.messageEntry=messageEntry; } + + /** + * @return the messageId + */ + public MessageId getMessageId(){ + return this.messageId; + } + + + /** + * @param messageId the messageId to set + */ + public void setMessageId(MessageId messageId){ + this.messageId=messageId; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java index f1c49781e9..1f8f184099 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java @@ -20,6 +20,7 @@ package org.apache.activemq.store.kahadaptor; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.activemq.command.MessageId; import org.apache.activemq.kaha.Marshaller; import org.apache.activemq.kaha.impl.index.IndexItem; @@ -39,6 +40,7 @@ public class ConsumerMessageRefMarshaller implements Marshaller{ */ public void writePayload(Object object,DataOutput dataOut) throws IOException{ ConsumerMessageRef ref = (ConsumerMessageRef) object; + dataOut.writeUTF(ref.getMessageId().toString()); IndexItem item = (IndexItem)ref.getMessageEntry(); dataOut.writeLong(item.getOffset()); item.write(dataOut); @@ -46,6 +48,7 @@ public class ConsumerMessageRefMarshaller implements Marshaller{ dataOut.writeLong(item.getOffset()); item.write(dataOut); + } /** @@ -56,6 +59,7 @@ public class ConsumerMessageRefMarshaller implements Marshaller{ */ public Object readPayload(DataInput dataIn) throws IOException{ ConsumerMessageRef ref = new ConsumerMessageRef(); + ref.setMessageId(new MessageId(dataIn.readUTF())); IndexItem item = new IndexItem(); item.setOffset(dataIn.readLong()); item.read(dataIn); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java index 07b2e53301..de72a35767 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java @@ -58,7 +58,8 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{ int subscriberCount=subscriberMessages.size(); if(subscriberCount>0){ - StoreEntry messageEntry=messageContainer.place(message.getMessageId(),message); + MessageId id = message.getMessageId(); + StoreEntry messageEntry=messageContainer.place(id,message); TopicSubAck tsa=new TopicSubAck(); tsa.setCount(subscriberCount); tsa.setMessageEntry(messageEntry); @@ -68,6 +69,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess ConsumerMessageRef ref=new ConsumerMessageRef(); ref.setAckEntry(ackEntry); ref.setMessageEntry(messageEntry); + ref.setMessageId(id); container.add(ref); } } @@ -78,7 +80,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess String subcriberId=getSubscriptionKey(clientId,subscriptionName); TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId); if(container!=null){ - ConsumerMessageRef ref=container.remove(); + ConsumerMessageRef ref=container.remove(messageId); if(container.isEmpty()){ container.reset(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java index 74be20329a..2ce8db6a74 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java @@ -68,30 +68,29 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic listener.recoverMessageReference(new MessageId(record.getMessageId())); } - public void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData data) - throws IOException{ - ReferenceRecord record=new ReferenceRecord(messageId.toString(),data); - int subscriberCount=subscriberMessages.size(); + public void addMessageReference(final ConnectionContext context,final MessageId messageId,final ReferenceData data){ + final ReferenceRecord record=new ReferenceRecord(messageId.toString(),data); + final int subscriberCount=subscriberMessages.size(); if(subscriberCount>0){ - StoreEntry messageEntry=messageContainer.place(messageId,record); + final StoreEntry messageEntry=messageContainer.place(messageId,record); addInterest(record); - TopicSubAck tsa=new TopicSubAck(); + final TopicSubAck tsa=new TopicSubAck(); tsa.setCount(subscriberCount); tsa.setMessageEntry(messageEntry); - StoreEntry ackEntry=ackContainer.placeLast(tsa); - for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){ - TopicSubContainer container=(TopicSubContainer)i.next(); - ConsumerMessageRef ref=new ConsumerMessageRef(); + final StoreEntry ackEntry=ackContainer.placeLast(tsa); + for(final Iterator i=subscriberMessages.values().iterator();i.hasNext();){ + final TopicSubContainer container=(TopicSubContainer)i.next(); + final ConsumerMessageRef ref=new ConsumerMessageRef(); ref.setAckEntry(ackEntry); ref.setMessageEntry(messageEntry); - StoreEntry listEntry = container.add(ref); - + ref.setMessageId(messageId); + container.add(ref); } } } - public ReferenceData getMessageReference(MessageId identity) throws IOException{ - ReferenceRecord result=messageContainer.get(identity); + public ReferenceData getMessageReference(final MessageId identity) throws IOException{ + final ReferenceRecord result=messageContainer.get(identity); if(result==null) return null; return result.getData(); @@ -119,12 +118,10 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName, MessageId messageId) throws IOException{ String key=getSubscriptionKey(clientId,subscriptionName); + TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); if(container!=null){ - ConsumerMessageRef ref=container.remove(); - if(container.isEmpty()){ - container.reset(); - } + ConsumerMessageRef ref=container.remove(messageId); if(ref!=null){ TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); if(tsa!=null){ diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java index cd822e0465..9350ae9123 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java @@ -15,6 +15,7 @@ package org.apache.activemq.store.kahadaptor; import java.util.Iterator; +import org.apache.activemq.command.MessageId; import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.StoreEntry; @@ -58,14 +59,17 @@ import org.apache.activemq.kaha.StoreEntry; return listContainer.placeLast(ref); } - public ConsumerMessageRef remove(){ + public ConsumerMessageRef remove(MessageId id){ ConsumerMessageRef result=null; if(!listContainer.isEmpty()){ - StoreEntry entry=listContainer.getFirst(); - if(entry!=null){ - result=(ConsumerMessageRef)listContainer.removeFirst(); - if(listContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){ - reset(); + for(StoreEntry entry=listContainer.getFirst();entry!=null;entry=listContainer.getNext(entry)){ + ConsumerMessageRef ref=(ConsumerMessageRef)listContainer.get(entry); + if(ref!=null&&ref.getMessageId().equals(id)){ + listContainer.remove(entry); + result=ref; + if(listContainer.isEmpty()||batchEntry.equals(entry)){ + reset(); + } } } }