check messageId when removing ConsumerMessageRef (pesky redo logs can add/delete things - breaking the assumption we always get a nice ordered behaviour for durable subs)

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@515630 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-03-07 16:15:08 +00:00
parent 0d9b413256
commit b3bbb9c2ea
5 changed files with 51 additions and 26 deletions

View File

@ -14,6 +14,7 @@
package org.apache.activemq.store.kahadaptor;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.StoreEntry;
/**
@ -23,6 +24,7 @@ import org.apache.activemq.kaha.StoreEntry;
*/
public class ConsumerMessageRef{
private MessageId messageId;
private StoreEntry messageEntry;
private StoreEntry ackEntry;
@ -54,5 +56,21 @@ public class ConsumerMessageRef{
this.messageEntry=messageEntry;
}
/**
* @return the messageId
*/
public MessageId getMessageId(){
return this.messageId;
}
/**
* @param messageId the messageId to set
*/
public void setMessageId(MessageId messageId){
this.messageId=messageId;
}
}

View File

@ -20,6 +20,7 @@ package org.apache.activemq.store.kahadaptor;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.impl.index.IndexItem;
@ -39,6 +40,7 @@ public class ConsumerMessageRefMarshaller implements Marshaller{
*/
public void writePayload(Object object,DataOutput dataOut) throws IOException{
ConsumerMessageRef ref = (ConsumerMessageRef) object;
dataOut.writeUTF(ref.getMessageId().toString());
IndexItem item = (IndexItem)ref.getMessageEntry();
dataOut.writeLong(item.getOffset());
item.write(dataOut);
@ -46,6 +48,7 @@ public class ConsumerMessageRefMarshaller implements Marshaller{
dataOut.writeLong(item.getOffset());
item.write(dataOut);
}
/**
@ -56,6 +59,7 @@ public class ConsumerMessageRefMarshaller implements Marshaller{
*/
public Object readPayload(DataInput dataIn) throws IOException{
ConsumerMessageRef ref = new ConsumerMessageRef();
ref.setMessageId(new MessageId(dataIn.readUTF()));
IndexItem item = new IndexItem();
item.setOffset(dataIn.readLong());
item.read(dataIn);

View File

@ -58,7 +58,8 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
int subscriberCount=subscriberMessages.size();
if(subscriberCount>0){
StoreEntry messageEntry=messageContainer.place(message.getMessageId(),message);
MessageId id = message.getMessageId();
StoreEntry messageEntry=messageContainer.place(id,message);
TopicSubAck tsa=new TopicSubAck();
tsa.setCount(subscriberCount);
tsa.setMessageEntry(messageEntry);
@ -68,6 +69,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
ConsumerMessageRef ref=new ConsumerMessageRef();
ref.setAckEntry(ackEntry);
ref.setMessageEntry(messageEntry);
ref.setMessageId(id);
container.add(ref);
}
}
@ -78,7 +80,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
String subcriberId=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
if(container!=null){
ConsumerMessageRef ref=container.remove();
ConsumerMessageRef ref=container.remove(messageId);
if(container.isEmpty()){
container.reset();
}

View File

@ -68,30 +68,29 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
listener.recoverMessageReference(new MessageId(record.getMessageId()));
}
public void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData data)
throws IOException{
ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
int subscriberCount=subscriberMessages.size();
public void addMessageReference(final ConnectionContext context,final MessageId messageId,final ReferenceData data){
final ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
final int subscriberCount=subscriberMessages.size();
if(subscriberCount>0){
StoreEntry messageEntry=messageContainer.place(messageId,record);
final StoreEntry messageEntry=messageContainer.place(messageId,record);
addInterest(record);
TopicSubAck tsa=new TopicSubAck();
final TopicSubAck tsa=new TopicSubAck();
tsa.setCount(subscriberCount);
tsa.setMessageEntry(messageEntry);
StoreEntry ackEntry=ackContainer.placeLast(tsa);
for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
TopicSubContainer container=(TopicSubContainer)i.next();
ConsumerMessageRef ref=new ConsumerMessageRef();
final StoreEntry ackEntry=ackContainer.placeLast(tsa);
for(final Iterator i=subscriberMessages.values().iterator();i.hasNext();){
final TopicSubContainer container=(TopicSubContainer)i.next();
final ConsumerMessageRef ref=new ConsumerMessageRef();
ref.setAckEntry(ackEntry);
ref.setMessageEntry(messageEntry);
StoreEntry listEntry = container.add(ref);
ref.setMessageId(messageId);
container.add(ref);
}
}
}
public ReferenceData getMessageReference(MessageId identity) throws IOException{
ReferenceRecord result=messageContainer.get(identity);
public ReferenceData getMessageReference(final MessageId identity) throws IOException{
final ReferenceRecord result=messageContainer.get(identity);
if(result==null)
return null;
return result.getData();
@ -119,12 +118,10 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
MessageId messageId) throws IOException{
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
if(container!=null){
ConsumerMessageRef ref=container.remove();
if(container.isEmpty()){
container.reset();
}
ConsumerMessageRef ref=container.remove(messageId);
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){

View File

@ -15,6 +15,7 @@
package org.apache.activemq.store.kahadaptor;
import java.util.Iterator;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.StoreEntry;
@ -58,14 +59,17 @@ import org.apache.activemq.kaha.StoreEntry;
return listContainer.placeLast(ref);
}
public ConsumerMessageRef remove(){
public ConsumerMessageRef remove(MessageId id){
ConsumerMessageRef result=null;
if(!listContainer.isEmpty()){
StoreEntry entry=listContainer.getFirst();
if(entry!=null){
result=(ConsumerMessageRef)listContainer.removeFirst();
if(listContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){
reset();
for(StoreEntry entry=listContainer.getFirst();entry!=null;entry=listContainer.getNext(entry)){
ConsumerMessageRef ref=(ConsumerMessageRef)listContainer.get(entry);
if(ref!=null&&ref.getMessageId().equals(id)){
listContainer.remove(entry);
result=ref;
if(listContainer.isEmpty()||batchEntry.equals(entry)){
reset();
}
}
}
}