From f8cb8475ed2a3eaf1ff16f54a348c247c69a27c3 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Wed, 13 Jan 2010 09:20:24 +0000 Subject: [PATCH] expose the Journal property archiveDataLogs through KahaDB git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@898689 13f79535-47bb-0310-9956-ffa450edef68 --- .../kahadb/KahaDBPersistenceAdapter.java | 23 +++++++++++++++---- .../store/kahadb/MessageDatabase.java | 23 +++++++++++++++++-- 2 files changed, 40 insertions(+), 6 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 38c1b3a71b..3dd13f9e8d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.store.kahadb; +import java.io.File; +import java.io.IOException; +import java.util.Set; import org.apache.activeio.journal.Journal; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; @@ -28,9 +31,6 @@ import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionStore; import org.apache.activemq.usage.SystemUsage; -import java.io.File; -import java.io.IOException; -import java.util.Set; /** * An implementation of {@link PersistenceAdapter} designed for use with a * {@link Journal} and then check pointing asynchronously on a timeout with some @@ -40,7 +40,7 @@ import java.util.Set; * @version $Revision: 1.17 $ */ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware { - private KahaDBStore letter = new KahaDBStore(); + private final KahaDBStore letter = new KahaDBStore(); /** @@ -370,4 +370,19 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi public void setBrokerService(BrokerService brokerService) { letter.setBrokerService(brokerService); } + + + /** + * @return the archiveDataLogs + */ + public boolean isArchiveDataLogs() { + return letter.isArchiveDataLogs(); + } + + /** + * @param archiveDataLogs the archiveDataLogs to set + */ + public void setArchiveDataLogs(boolean archiveDataLogs) { + letter.setArchiveDataLogs(archiveDataLogs); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index e98744d583..f719700599 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -35,7 +35,6 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.command.ConnectionId; @@ -77,7 +76,6 @@ import org.apache.kahadb.util.Sequence; import org.apache.kahadb.util.SequenceSet; import org.apache.kahadb.util.StringMarshaller; import org.apache.kahadb.util.VariableMarshaller; -import org.springframework.core.enums.LetterCodedLabeledEnum; public class MessageDatabase implements BrokerServiceAware { @@ -159,6 +157,7 @@ public class MessageDatabase implements BrokerServiceAware { protected File directory; protected Thread checkpointThread; protected boolean enableJournalDiskSyncs=true; + protected boolean archiveDataLogs; long checkpointInterval = 5*1000; long cleanupInterval = 30*1000; int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; @@ -173,6 +172,7 @@ public class MessageDatabase implements BrokerServiceAware { private int indexCacheSize = 100; private boolean checkForCorruptJournalFiles = false; private boolean checksumJournalFiles = false; + public MessageDatabase() { } @@ -234,6 +234,7 @@ public class MessageDatabase implements BrokerServiceAware { private void startCheckpoint() { checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") { + @Override public void run() { try { long lastCleanup = System.currentTimeMillis(); @@ -510,6 +511,7 @@ public class MessageDatabase implements BrokerServiceAware { final ArrayList matches = new ArrayList(); sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor(missingPredicates) { + @Override protected void matched(Location key, Long value) { matches.add(value); } @@ -1375,6 +1377,7 @@ public class MessageDatabase implements BrokerServiceAware { this.command = command; } + @Override public void execute(Transaction tx) throws IOException { upadateIndex(tx, command, location); } @@ -1392,6 +1395,7 @@ public class MessageDatabase implements BrokerServiceAware { this.command = command; } + @Override public void execute(Transaction tx) throws IOException { updateIndex(tx, command, location); } @@ -1420,6 +1424,7 @@ public class MessageDatabase implements BrokerServiceAware { manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles); manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles); manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); + manager.setArchiveDataLogs(isArchiveDataLogs()); return manager; } @@ -1552,4 +1557,18 @@ public class MessageDatabase implements BrokerServiceAware { public void setBrokerService(BrokerService brokerService) { this.brokerService = brokerService; } + + /** + * @return the archiveDataLogs + */ + public boolean isArchiveDataLogs() { + return this.archiveDataLogs; + } + + /** + * @param archiveDataLogs the archiveDataLogs to set + */ + public void setArchiveDataLogs(boolean archiveDataLogs) { + this.archiveDataLogs = archiveDataLogs; + } }