From ced50c994004c952a38fe026df29868f528d22c7 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Thu, 4 Jan 2007 10:39:04 +0000 Subject: [PATCH] Quick store bug fixes. git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@492511 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/store/ReferenceStore.java | 5 ++++ .../kahadaptor/KahaTopicReferenceStore.java | 2 +- .../store/quick/QuickMessageStore.java | 28 +++++++++++-------- .../store/quick/QuickPersistenceAdapter.java | 18 ++++-------- .../store/quick/QuickTransactionStore.java | 14 ++++++---- 5 files changed, 36 insertions(+), 31 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java index ab4ca614a2..f68ca06d1a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java @@ -54,6 +54,11 @@ public interface ReferenceStore extends MessageStore { public void setOffset(int offset) { this.offset = offset; } + + @Override + public String toString() { + return "ReferenceData fileId="+fileId+", offset="+offset+", expiration="+expiration; + } } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java index 156a76f71d..e744b36330 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java @@ -95,7 +95,7 @@ public class KahaTopicReferenceStore extends KahaTopicMessageStore implements To }else{ for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) { ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry); - if(msg.messageId.equals(identity)){ + if(msg.messageId.equals(identity.toString())){ result=msg; cache.put(identity,entry); break; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java index 073062c173..78ffb2714b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.DataStructure; import org.apache.activemq.command.JournalQueueAck; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; @@ -167,6 +168,8 @@ public class QuickMessageStore implements MessageStore { data.setFileId(location.getDataFileId()); data.setOffset(location.getOffset()); referenceStore.addMessageReference(context, id, data); + System.out.println("referenceStore.put "+id+"-->"+data); + } } catch (Throwable e) { @@ -332,6 +335,7 @@ public class QuickMessageStore implements MessageStore { Entry entry = iterator.next(); try { referenceStore.addMessageReference(context, entry.getKey(), entry.getValue() ); + System.out.println("referenceStore.put "+entry.getKey()+"-->"+entry.getValue()); } catch (Throwable e) { log.warn("Message could not be added to long term store: " + e.getMessage(), e); } @@ -394,21 +398,23 @@ public class QuickMessageStore implements MessageStore { if( data==null ) { data = referenceStore.getMessageReference(identity); + System.out.println("referenceStore.get "+identity+"-->"+data); + if( data==null ) { + return null; + } } - - if( data==null ) { - return null; - } - - Message answer = null; - if (answer != null ) { - return answer; - } - + Location location = new Location(); location.setDataFileId(data.getFileId()); location.setOffset(data.getOffset()); - return (Message) peristenceAdapter.readCommand(location); + + DataStructure rc = peristenceAdapter.readCommand(location); + + try { + return (Message) rc; + } catch (ClassCastException e) { + throw new IOException("Could not read message "+identity+" at location "+location+", expected a message, but got: "+rc); + } } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java index bdff7506f4..1b5c8e7c16 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java @@ -88,7 +88,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene private UsageManager usageManager; - private long cleanupInterval = 1000 * 10; + private long cleanupInterval = 1000 * 1/10; private long checkpointInterval = 1000 * 10; private int maxCheckpointWorkers = 1; @@ -147,15 +147,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene referenceStoreAdapter.start(); Set files = referenceStoreAdapter.getReferenceFileIdsInUse(); - for (Integer fileId : files) { - try { - asyncDataManager.addInterestInFile(fileId); - } catch (IOException e) { - // We can expect these since referenceStoreAdapter is a litle behind in updates - // and it might think it has references to data files that have allready come and gone.. - // This should get resolved once recovery kicks in. - } - } + log.info("Active data files: "+files); checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){ public boolean iterate() { @@ -172,8 +164,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene public void run() { checkpoint(false); } - }; - + }; Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval); periodicCleanupTask = new Runnable() { @@ -193,6 +184,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene this.usageManager.removeUsageListener(this); Scheduler.cancel(periodicCheckpointTask); + Scheduler.cancel(periodicCleanupTask); Iterator iterator = queues.values().iterator(); @@ -499,7 +491,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) { TxOperation op = (TxOperation) iter.next(); if (op.operationType == TxOperation.ADD_OPERATION_TYPE) { - op.store.replayAddMessage(context, (Message)op.data, pos); + op.store.replayAddMessage(context, (Message)op.data, op.location); } if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) { op.store.replayRemoveMessage(context, (MessageAck) op.data); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java b/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java index ffef67fbe4..c1ff17c91d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java @@ -56,11 +56,13 @@ public class QuickTransactionStore implements TransactionStore { public byte operationType; public QuickMessageStore store; public Object data; + public Location location; - public TxOperation(byte operationType, QuickMessageStore store, Object data) { + public TxOperation(byte operationType, QuickMessageStore store, Object data, Location location) { this.operationType=operationType; this.store=store; this.data=data; + this.location=location; } } @@ -77,16 +79,16 @@ public class QuickTransactionStore implements TransactionStore { this.location=location; } - public void add(QuickMessageStore store, Message msg) { - operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg)); + public void add(QuickMessageStore store, Message msg, Location location) { + operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, location)); } public void add(QuickMessageStore store, MessageAck ack) { - operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack)); + operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack, null)); } public void add(QuickTopicMessageStore store, JournalTopicAck ack) { - operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack)); + operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, null)); } public Message[] getMessages() { @@ -283,7 +285,7 @@ public class QuickTransactionStore implements TransactionStore { */ void addMessage(QuickMessageStore store, Message message, Location location) throws IOException { Tx tx = getTx(message.getTransactionId(), location); - tx.add(store, message); + tx.add(store, message, location); } /**