From 31d327726aceea1ed0b7b90609ddf5aa799befce Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Fri, 2 Mar 2007 20:00:11 +0000 Subject: [PATCH] Fixed Queue cursor test case for AMQ Store git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@513921 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/store/amq/AMQMessageStore.java | 14 ++++-- .../store/amq/RecoveryListenerAdapter.java | 4 +- .../store/kahadaptor/KahaReferenceStore.java | 47 +++++++++---------- .../kahadaptor/KahaTopicReferenceStore.java | 6 +-- .../store/kahadaptor/ReferenceRecord.java | 36 +++++++++++++- .../kahadaptor/ReferenceRecordMarshaller.java | 19 ++++---- 6 files changed, 80 insertions(+), 46 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java index 8aa001ed6e..43ac23d62f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java @@ -68,8 +68,9 @@ public class AMQMessageStore implements MessageStore{ protected HashSet inFlightTxLocations=new HashSet(); protected final TaskRunner asyncWriteTask; protected CountDownLatch flushLatch; + private final boolean debug=log.isDebugEnabled(); private final AtomicReference mark=new AtomicReference(); - + public AMQMessageStore(AMQPersistenceAdapter adapter,ReferenceStore referenceStore,ActiveMQDestination destination){ this.peristenceAdapter=adapter; this.transactionStore=adapter.getTransactionStore(); @@ -95,7 +96,7 @@ public class AMQMessageStore implements MessageStore{ */ public void addMessage(ConnectionContext context,final Message message) throws IOException{ final MessageId id=message.getMessageId(); - final boolean debug=log.isDebugEnabled(); + final Location location=peristenceAdapter.writeCommand(message,message.isResponseRequired()); if(!context.isInTransaction()){ if(debug) @@ -168,7 +169,6 @@ public class AMQMessageStore implements MessageStore{ /** */ public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException{ - final boolean debug=log.isDebugEnabled(); JournalQueueAck remove=new JournalQueueAck(); remove.setDestination(destination); remove.setMessageAck(ack); @@ -450,6 +450,7 @@ public class AMQMessageStore implements MessageStore{ } public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ + /* RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener); if(referenceStore.supportsExternalBatchControl()){ synchronized(this){ @@ -472,6 +473,13 @@ public class AMQMessageStore implements MessageStore{ flush(); referenceStore.recoverNextMessages(maxReturned,recoveryListener); } + */ + RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener); + referenceStore.recoverNextMessages(maxReturned,recoveryListener); + if(recoveryListener.size()==0&&recoveryListener.hasSpace()){ + flush(); + referenceStore.recoverNextMessages(maxReturned,recoveryListener); + } } Message getMessage(ReferenceData data) throws IOException{ diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java index 9a2a1f51b1..b04124e4d4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java @@ -51,9 +51,7 @@ final class RecoveryListenerAdapter implements MessageRecoveryListener{ public void recoverMessageReference(MessageId ref) throws Exception{ Message message=this.store.getMessage(ref); if(message!=null){ - listener.recoverMessage(message); - count++; - lastRecovered=ref; + recoverMessage(message); }else{ log.error("Message id "+ref+" could not be recovered from the data store!"); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java index 188919ab4c..6acd860c0c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java @@ -32,7 +32,7 @@ public class KahaReferenceStore implements ReferenceStore{ protected final ActiveMQDestination destination; protected final MapContainer messageContainer; protected KahaReferenceStoreAdapter adapter; - protected StoreEntry batchEntry=null; + private StoreEntry batchEntry=null; public KahaReferenceStore(KahaReferenceStoreAdapter adapter,MapContainer container,ActiveMQDestination destination) throws IOException{ this.adapter = adapter; @@ -47,7 +47,7 @@ public class KahaReferenceStore implements ReferenceStore{ } protected MessageId getMessageId(Object object){ - return new MessageId(((ReferenceRecord)object).messageId); + return new MessageId(((ReferenceRecord)object).getMessageId()); } public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{ @@ -60,13 +60,13 @@ public class KahaReferenceStore implements ReferenceStore{ protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{ ReferenceRecord record=(ReferenceRecord)msg; - listener.recoverMessageReference(new MessageId(record.messageId)); + listener.recoverMessageReference(new MessageId(record.getMessageId())); } public synchronized void recover(MessageRecoveryListener listener) throws Exception{ for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){ ReferenceRecord record=messageContainer.getValue(entry); - recover(listener,new MessageId(record.messageId)); + recover(listener,new MessageId(record.getMessageId())); } listener.finished(); } @@ -78,9 +78,6 @@ public class KahaReferenceStore implements ReferenceStore{ }else{ entry=messageContainer.refresh(entry); entry=messageContainer.getNext(entry); - if (entry==null) { - batchEntry=null; - } } if(entry!=null){ int count=0; @@ -108,7 +105,7 @@ public class KahaReferenceStore implements ReferenceStore{ ReferenceRecord result=messageContainer.get(identity); if(result==null) return null; - return result.data; + return result.getData(); } public void addReferenceFileIdsInUse(){ @@ -123,10 +120,12 @@ public class KahaReferenceStore implements ReferenceStore{ } public synchronized void removeMessage(MessageId msgId) throws IOException{ - ReferenceRecord rr = messageContainer.remove(msgId); - removeInterest(rr); - if(messageContainer.isEmpty()){ - resetBatching(); + ReferenceRecord rr=messageContainer.remove(msgId); + if(rr!=null){ + removeInterest(rr); + if(messageContainer.isEmpty()){ + resetBatching(); + } } } @@ -157,27 +156,23 @@ public class KahaReferenceStore implements ReferenceStore{ return true; } - /** - * @param startAfter - * @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId) - */ - public void setBatch(MessageId startAfter){ - resetBatching(); - if (startAfter != null) { - batchEntry = messageContainer.getEntry(startAfter); - } - - } - + public boolean supportsExternalBatchControl(){ return true; } void removeInterest(ReferenceRecord rr) { - adapter.removeInterestInRecordFile(rr.data.getFileId()); + adapter.removeInterestInRecordFile(rr.getData().getFileId()); } void addInterest(ReferenceRecord rr) { - adapter.addInterestInRecordFile(rr.data.getFileId()); + adapter.addInterestInRecordFile(rr.getData().getFileId()); + } + + /** + * @param startAfter + * @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId) + */ + public void setBatch(MessageId startAfter){ } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java index e90fc46aaa..434008a93e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java @@ -53,7 +53,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic } protected MessageId getMessageId(Object object){ - return new MessageId(((ReferenceRecord)object).messageId); + return new MessageId(((ReferenceRecord)object).getMessageId()); } public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{ @@ -66,7 +66,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{ ReferenceRecord record=(ReferenceRecord)msg; - listener.recoverMessageReference(new MessageId(record.messageId)); + listener.recoverMessageReference(new MessageId(record.getMessageId())); } public void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData data) @@ -94,7 +94,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic ReferenceRecord result=messageContainer.get(identity); if(result==null) return null; - return result.data; + return result.getData(); } public void addReferenceFileIdsInUse(){ diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java index acc60a8099..ffdf2e7e06 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java @@ -18,8 +18,8 @@ import org.apache.activemq.store.ReferenceStore.ReferenceData; public class ReferenceRecord{ - public String messageId; - public ReferenceData data; + private String messageId; + private ReferenceData data; public ReferenceRecord(){ } @@ -28,4 +28,36 @@ public class ReferenceRecord{ this.messageId=messageId; this.data=data; } + + + /** + * @return the data + */ + public ReferenceData getData(){ + return this.data; + } + + + /** + * @param data the data to set + */ + public void setData(ReferenceData data){ + this.data=data; + } + + + /** + * @return the messageId + */ + public String getMessageId(){ + return this.messageId; + } + + + /** + * @param messageId the messageId to set + */ + public void setMessageId(String messageId){ + this.messageId=messageId; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java index 96898b4a91..fcb12f4ba0 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java @@ -24,11 +24,12 @@ public class ReferenceRecordMarshaller implements Marshaller{ public ReferenceRecord readPayload(DataInput dataIn) throws IOException{ ReferenceRecord rr=new ReferenceRecord(); - rr.messageId=dataIn.readUTF(); - rr.data=new ReferenceData(); - rr.data.setFileId(dataIn.readInt()); - rr.data.setOffset(dataIn.readInt()); - rr.data.setExpiration(dataIn.readLong()); + rr.setMessageId(dataIn.readUTF()); + ReferenceData referenceData = new ReferenceData(); + referenceData.setFileId(dataIn.readInt()); + referenceData.setOffset(dataIn.readInt()); + referenceData.setExpiration(dataIn.readLong()); + rr.setData(referenceData); return rr; } @@ -39,9 +40,9 @@ public class ReferenceRecordMarshaller implements Marshaller{ * @see org.apache.activemq.kaha.Marshaller#writePayload(java.lang.Object, java.io.DataOutput) */ public void writePayload(ReferenceRecord rr,DataOutput dataOut) throws IOException{ - dataOut.writeUTF(rr.messageId); - dataOut.writeInt(rr.data.getFileId()); - dataOut.writeInt(rr.data.getOffset()); - dataOut.writeLong(rr.data.getExpiration()); + dataOut.writeUTF(rr.getMessageId()); + dataOut.writeInt(rr.getData().getFileId()); + dataOut.writeInt(rr.getData().getOffset()); + dataOut.writeLong(rr.getData().getExpiration()); } }