Fixed Queue cursor test case for AMQ Store

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@513921 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-03-02 20:00:11 +00:00
parent 55fa95460c
commit 31d327726a
6 changed files with 80 additions and 46 deletions

View File

@ -68,6 +68,7 @@ public class AMQMessageStore implements MessageStore{
protected HashSet<Location> inFlightTxLocations=new HashSet<Location>(); protected HashSet<Location> inFlightTxLocations=new HashSet<Location>();
protected final TaskRunner asyncWriteTask; protected final TaskRunner asyncWriteTask;
protected CountDownLatch flushLatch; protected CountDownLatch flushLatch;
private final boolean debug=log.isDebugEnabled();
private final AtomicReference<Location> mark=new AtomicReference<Location>(); private final AtomicReference<Location> mark=new AtomicReference<Location>();
public AMQMessageStore(AMQPersistenceAdapter adapter,ReferenceStore referenceStore,ActiveMQDestination destination){ public AMQMessageStore(AMQPersistenceAdapter adapter,ReferenceStore referenceStore,ActiveMQDestination destination){
@ -95,7 +96,7 @@ public class AMQMessageStore implements MessageStore{
*/ */
public void addMessage(ConnectionContext context,final Message message) throws IOException{ public void addMessage(ConnectionContext context,final Message message) throws IOException{
final MessageId id=message.getMessageId(); final MessageId id=message.getMessageId();
final boolean debug=log.isDebugEnabled();
final Location location=peristenceAdapter.writeCommand(message,message.isResponseRequired()); final Location location=peristenceAdapter.writeCommand(message,message.isResponseRequired());
if(!context.isInTransaction()){ if(!context.isInTransaction()){
if(debug) if(debug)
@ -168,7 +169,6 @@ public class AMQMessageStore implements MessageStore{
/** /**
*/ */
public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException{ public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException{
final boolean debug=log.isDebugEnabled();
JournalQueueAck remove=new JournalQueueAck(); JournalQueueAck remove=new JournalQueueAck();
remove.setDestination(destination); remove.setDestination(destination);
remove.setMessageAck(ack); remove.setMessageAck(ack);
@ -450,6 +450,7 @@ public class AMQMessageStore implements MessageStore{
} }
public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
/*
RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener); RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener);
if(referenceStore.supportsExternalBatchControl()){ if(referenceStore.supportsExternalBatchControl()){
synchronized(this){ synchronized(this){
@ -472,6 +473,13 @@ public class AMQMessageStore implements MessageStore{
flush(); flush();
referenceStore.recoverNextMessages(maxReturned,recoveryListener); referenceStore.recoverNextMessages(maxReturned,recoveryListener);
} }
*/
RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener);
referenceStore.recoverNextMessages(maxReturned,recoveryListener);
if(recoveryListener.size()==0&&recoveryListener.hasSpace()){
flush();
referenceStore.recoverNextMessages(maxReturned,recoveryListener);
}
} }
Message getMessage(ReferenceData data) throws IOException{ Message getMessage(ReferenceData data) throws IOException{

View File

@ -51,9 +51,7 @@ final class RecoveryListenerAdapter implements MessageRecoveryListener{
public void recoverMessageReference(MessageId ref) throws Exception{ public void recoverMessageReference(MessageId ref) throws Exception{
Message message=this.store.getMessage(ref); Message message=this.store.getMessage(ref);
if(message!=null){ if(message!=null){
listener.recoverMessage(message); recoverMessage(message);
count++;
lastRecovered=ref;
}else{ }else{
log.error("Message id "+ref+" could not be recovered from the data store!"); log.error("Message id "+ref+" could not be recovered from the data store!");
} }

View File

@ -32,7 +32,7 @@ public class KahaReferenceStore implements ReferenceStore{
protected final ActiveMQDestination destination; protected final ActiveMQDestination destination;
protected final MapContainer<MessageId,ReferenceRecord> messageContainer; protected final MapContainer<MessageId,ReferenceRecord> messageContainer;
protected KahaReferenceStoreAdapter adapter; protected KahaReferenceStoreAdapter adapter;
protected StoreEntry batchEntry=null; private StoreEntry batchEntry=null;
public KahaReferenceStore(KahaReferenceStoreAdapter adapter,MapContainer container,ActiveMQDestination destination) throws IOException{ public KahaReferenceStore(KahaReferenceStoreAdapter adapter,MapContainer container,ActiveMQDestination destination) throws IOException{
this.adapter = adapter; this.adapter = adapter;
@ -47,7 +47,7 @@ public class KahaReferenceStore implements ReferenceStore{
} }
protected MessageId getMessageId(Object object){ protected MessageId getMessageId(Object object){
return new MessageId(((ReferenceRecord)object).messageId); return new MessageId(((ReferenceRecord)object).getMessageId());
} }
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{ public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
@ -60,13 +60,13 @@ public class KahaReferenceStore implements ReferenceStore{
protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{ protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{
ReferenceRecord record=(ReferenceRecord)msg; ReferenceRecord record=(ReferenceRecord)msg;
listener.recoverMessageReference(new MessageId(record.messageId)); listener.recoverMessageReference(new MessageId(record.getMessageId()));
} }
public synchronized void recover(MessageRecoveryListener listener) throws Exception{ public synchronized void recover(MessageRecoveryListener listener) throws Exception{
for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){ for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
ReferenceRecord record=messageContainer.getValue(entry); ReferenceRecord record=messageContainer.getValue(entry);
recover(listener,new MessageId(record.messageId)); recover(listener,new MessageId(record.getMessageId()));
} }
listener.finished(); listener.finished();
} }
@ -78,9 +78,6 @@ public class KahaReferenceStore implements ReferenceStore{
}else{ }else{
entry=messageContainer.refresh(entry); entry=messageContainer.refresh(entry);
entry=messageContainer.getNext(entry); entry=messageContainer.getNext(entry);
if (entry==null) {
batchEntry=null;
}
} }
if(entry!=null){ if(entry!=null){
int count=0; int count=0;
@ -108,7 +105,7 @@ public class KahaReferenceStore implements ReferenceStore{
ReferenceRecord result=messageContainer.get(identity); ReferenceRecord result=messageContainer.get(identity);
if(result==null) if(result==null)
return null; return null;
return result.data; return result.getData();
} }
public void addReferenceFileIdsInUse(){ public void addReferenceFileIdsInUse(){
@ -124,11 +121,13 @@ public class KahaReferenceStore implements ReferenceStore{
public synchronized void removeMessage(MessageId msgId) throws IOException{ public synchronized void removeMessage(MessageId msgId) throws IOException{
ReferenceRecord rr=messageContainer.remove(msgId); ReferenceRecord rr=messageContainer.remove(msgId);
if(rr!=null){
removeInterest(rr); removeInterest(rr);
if(messageContainer.isEmpty()){ if(messageContainer.isEmpty()){
resetBatching(); resetBatching();
} }
} }
}
public synchronized void removeAllMessages(ConnectionContext context) throws IOException{ public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
messageContainer.clear(); messageContainer.clear();
@ -157,27 +156,23 @@ public class KahaReferenceStore implements ReferenceStore{
return true; return true;
} }
/**
* @param startAfter
* @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
*/
public void setBatch(MessageId startAfter){
resetBatching();
if (startAfter != null) {
batchEntry = messageContainer.getEntry(startAfter);
}
}
public boolean supportsExternalBatchControl(){ public boolean supportsExternalBatchControl(){
return true; return true;
} }
void removeInterest(ReferenceRecord rr) { void removeInterest(ReferenceRecord rr) {
adapter.removeInterestInRecordFile(rr.data.getFileId()); adapter.removeInterestInRecordFile(rr.getData().getFileId());
} }
void addInterest(ReferenceRecord rr) { void addInterest(ReferenceRecord rr) {
adapter.addInterestInRecordFile(rr.data.getFileId()); adapter.addInterestInRecordFile(rr.getData().getFileId());
}
/**
* @param startAfter
* @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
*/
public void setBatch(MessageId startAfter){
} }
} }

View File

@ -53,7 +53,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
} }
protected MessageId getMessageId(Object object){ protected MessageId getMessageId(Object object){
return new MessageId(((ReferenceRecord)object).messageId); return new MessageId(((ReferenceRecord)object).getMessageId());
} }
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{ public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
@ -66,7 +66,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{ protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{
ReferenceRecord record=(ReferenceRecord)msg; ReferenceRecord record=(ReferenceRecord)msg;
listener.recoverMessageReference(new MessageId(record.messageId)); listener.recoverMessageReference(new MessageId(record.getMessageId()));
} }
public void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData data) public void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData data)
@ -94,7 +94,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
ReferenceRecord result=messageContainer.get(identity); ReferenceRecord result=messageContainer.get(identity);
if(result==null) if(result==null)
return null; return null;
return result.data; return result.getData();
} }
public void addReferenceFileIdsInUse(){ public void addReferenceFileIdsInUse(){

View File

@ -18,8 +18,8 @@ import org.apache.activemq.store.ReferenceStore.ReferenceData;
public class ReferenceRecord{ public class ReferenceRecord{
public String messageId; private String messageId;
public ReferenceData data; private ReferenceData data;
public ReferenceRecord(){ public ReferenceRecord(){
} }
@ -28,4 +28,36 @@ public class ReferenceRecord{
this.messageId=messageId; this.messageId=messageId;
this.data=data; this.data=data;
} }
/**
* @return the data
*/
public ReferenceData getData(){
return this.data;
}
/**
* @param data the data to set
*/
public void setData(ReferenceData data){
this.data=data;
}
/**
* @return the messageId
*/
public String getMessageId(){
return this.messageId;
}
/**
* @param messageId the messageId to set
*/
public void setMessageId(String messageId){
this.messageId=messageId;
}
} }

View File

@ -24,11 +24,12 @@ public class ReferenceRecordMarshaller implements Marshaller<ReferenceRecord>{
public ReferenceRecord readPayload(DataInput dataIn) throws IOException{ public ReferenceRecord readPayload(DataInput dataIn) throws IOException{
ReferenceRecord rr=new ReferenceRecord(); ReferenceRecord rr=new ReferenceRecord();
rr.messageId=dataIn.readUTF(); rr.setMessageId(dataIn.readUTF());
rr.data=new ReferenceData(); ReferenceData referenceData = new ReferenceData();
rr.data.setFileId(dataIn.readInt()); referenceData.setFileId(dataIn.readInt());
rr.data.setOffset(dataIn.readInt()); referenceData.setOffset(dataIn.readInt());
rr.data.setExpiration(dataIn.readLong()); referenceData.setExpiration(dataIn.readLong());
rr.setData(referenceData);
return rr; return rr;
} }
@ -39,9 +40,9 @@ public class ReferenceRecordMarshaller implements Marshaller<ReferenceRecord>{
* @see org.apache.activemq.kaha.Marshaller#writePayload(java.lang.Object, java.io.DataOutput) * @see org.apache.activemq.kaha.Marshaller#writePayload(java.lang.Object, java.io.DataOutput)
*/ */
public void writePayload(ReferenceRecord rr,DataOutput dataOut) throws IOException{ public void writePayload(ReferenceRecord rr,DataOutput dataOut) throws IOException{
dataOut.writeUTF(rr.messageId); dataOut.writeUTF(rr.getMessageId());
dataOut.writeInt(rr.data.getFileId()); dataOut.writeInt(rr.getData().getFileId());
dataOut.writeInt(rr.data.getOffset()); dataOut.writeInt(rr.getData().getOffset());
dataOut.writeLong(rr.data.getExpiration()); dataOut.writeLong(rr.getData().getExpiration());
} }
} }