check we don't keep hold of a batch entry after its been deleted

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@547906 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-06-16 11:27:48 +00:00
parent 4d1a176784
commit ebcf2202fe
5 changed files with 45 additions and 35 deletions

View File

@ -66,8 +66,8 @@ public class KahaMessageStore implements MessageStore{
return result;
}
protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{
listener.recoverMessage((Message)msg);
protected void recoverMessage(MessageRecoveryListener listener,Message msg) throws Exception{
listener.recoverMessage(msg);
}
public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
@ -89,7 +89,7 @@ public class KahaMessageStore implements MessageStore{
public synchronized void recover(MessageRecoveryListener listener) throws Exception{
for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
Message msg=(Message)messageContainer.getValue(entry);
recover(listener,msg);
recoverMessage(listener,msg);
}
listener.finished();
}
@ -158,9 +158,9 @@ public class KahaMessageStore implements MessageStore{
if(entry!=null){
int count=0;
do{
Object msg=messageContainer.getValue(entry);
Message msg=messageContainer.getValue(entry);
if(msg!=null){
recover(listener,msg);
recoverMessage(listener,msg);
count++;
}
batchEntry=entry;

View File

@ -15,7 +15,6 @@
package org.apache.activemq.store.kahadaptor;
import java.io.IOException;
import java.util.Set;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@ -33,6 +32,7 @@ public class KahaReferenceStore implements ReferenceStore{
protected final MapContainer<MessageId,ReferenceRecord> messageContainer;
protected KahaReferenceStoreAdapter adapter;
private StoreEntry batchEntry=null;
private String lastBatchId=null;
public KahaReferenceStore(KahaReferenceStoreAdapter adapter,MapContainer container,ActiveMQDestination destination) throws IOException{
this.adapter = adapter;
@ -58,15 +58,14 @@ public class KahaReferenceStore implements ReferenceStore{
throw new RuntimeException("Use addMessageReference instead");
}
protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{
ReferenceRecord record=(ReferenceRecord)msg;
protected final void recoverReference(MessageRecoveryListener listener,ReferenceRecord record) throws Exception{
listener.recoverMessageReference(new MessageId(record.getMessageId()));
}
public synchronized void recover(MessageRecoveryListener listener) throws Exception{
for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
ReferenceRecord record=messageContainer.getValue(entry);
recover(listener,new MessageId(record.getMessageId()));
recoverReference(listener,record);
}
listener.finished();
}
@ -77,17 +76,20 @@ public class KahaReferenceStore implements ReferenceStore{
entry=messageContainer.getFirst();
}else{
entry=messageContainer.refresh(entry);
if (entry != null) {
entry=messageContainer.getNext(entry);
if(entry!=null){
entry=messageContainer.getNext(entry);
}
}
if(entry!=null){
int count=0;
do{
Object msg=messageContainer.getValue(entry);
ReferenceRecord msg=messageContainer.getValue(entry);
if(msg!=null){
recover(listener,msg);
recoverReference(listener,msg);
count++;
lastBatchId=msg.getMessageId();
}else{
lastBatchId=null;
}
batchEntry=entry;
entry=messageContainer.getNext(entry);
@ -96,14 +98,14 @@ public class KahaReferenceStore implements ReferenceStore{
listener.finished();
}
public void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData data)
public synchronized void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData data)
throws IOException{
ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
messageContainer.put(messageId,record);
addInterest(record);
}
public ReferenceData getMessageReference(MessageId identity) throws IOException{
public synchronized ReferenceData getMessageReference(MessageId identity) throws IOException{
ReferenceRecord result=messageContainer.get(identity);
if(result==null)
return null;
@ -127,7 +129,8 @@ public class KahaReferenceStore implements ReferenceStore{
ReferenceRecord rr=messageContainer.remove(msgId);
if(rr!=null){
removeInterest(rr);
if(messageContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){
if(messageContainer.isEmpty()||(lastBatchId!=null&&lastBatchId.equals(msgId.toString()))
||(batchEntry!=null&&batchEntry.equals(entry))){
resetBatching();
}
}
@ -148,6 +151,7 @@ public class KahaReferenceStore implements ReferenceStore{
public void resetBatching(){
batchEntry=null;
lastBatchId=null;
}
public int getMessageCount(){

View File

@ -146,9 +146,9 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
if(container!=null){
for(Iterator i=container.iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
Object msg=messageContainer.get(ref.getMessageEntry());
Message msg=messageContainer.get(ref.getMessageEntry());
if(msg!=null){
recover(listener, msg);
recoverMessage(listener, msg);
}
}
}
@ -173,12 +173,15 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
if(entry!=null){
do{
ConsumerMessageRef consumerRef=container.get(entry);
Object msg=messageContainer.getValue(consumerRef.getMessageEntry());
Message msg=messageContainer.getValue(consumerRef.getMessageEntry());
if(msg!=null){
recover(listener, msg);
recoverMessage(listener, msg);
count++;
container.setBatchEntry(msg.getMessageId().toString(),entry);
}else {
container.reset();
}
container.setBatchEntry(entry);
entry=container.getNextEntry(entry);
}while(entry!=null&&count<maxReturned&&listener.hasSpace());
}

View File

@ -63,11 +63,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
throw new RuntimeException("Use addMessageReference instead");
}
protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{
ReferenceRecord record=(ReferenceRecord)msg;
listener.recoverMessageReference(new MessageId(record.getMessageId()));
}
public void addMessageReference(final ConnectionContext context,final MessageId messageId,final ReferenceData data){
final ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
final int subscriberCount=subscriberMessages.size();
@ -193,7 +189,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
}
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
public synchronized void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
MessageRecoveryListener listener) throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
@ -208,15 +204,19 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
entry=container.getNextEntry(entry);
}
}
if(entry!=null){
do{
ConsumerMessageRef consumerRef=container.get(entry);
Object msg=messageContainer.getValue(consumerRef.getMessageEntry());
ReferenceRecord msg=messageContainer.getValue(consumerRef.getMessageEntry());
if(msg!=null){
recover(listener,msg);
recoverReference(listener,msg);
count++;
container.setBatchEntry(msg.getMessageId().toString(),entry);
}else {
container.reset();
}
container.setBatchEntry(entry);
entry=container.getNextEntry(entry);
}while(entry!=null&&count<maxReturned&&listener.hasSpace());
}
@ -232,9 +232,9 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
if(container!=null){
for(Iterator i=container.iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
Object msg=messageContainer.get(ref.getMessageEntry());
ReferenceRecord msg=messageContainer.get(ref.getMessageEntry());
if(msg!=null){
recover(listener,msg);
recoverReference(listener,msg);
}
}
}

View File

@ -25,8 +25,9 @@ import java.util.Iterator;
* @version $Revision: 1.10 $
*/
public class TopicSubContainer {
private ListContainer listContainer;
private StoreEntry batchEntry;
private transient ListContainer listContainer;
private transient StoreEntry batchEntry;
private transient String lastBatchId;
public TopicSubContainer(ListContainer container) {
this.listContainer = container;
@ -42,11 +43,13 @@ public class TopicSubContainer {
/**
* @param batchEntry the batchEntry to set
*/
public void setBatchEntry(StoreEntry batchEntry) {
public void setBatchEntry(String id,StoreEntry batchEntry) {
this.lastBatchId=id;
this.batchEntry = batchEntry;
}
public void reset() {
lastBatchId=null;
batchEntry = null;
}