mirror of https://github.com/apache/activemq.git
expose the Journal property archiveDataLogs through KahaDB
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@898689 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9cf216cce1
commit
f8cb8475ed
|
@ -16,6 +16,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.store.kahadb;
|
package org.apache.activemq.store.kahadb;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Set;
|
||||||
import org.apache.activeio.journal.Journal;
|
import org.apache.activeio.journal.Journal;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.BrokerServiceAware;
|
import org.apache.activemq.broker.BrokerServiceAware;
|
||||||
|
@ -28,9 +31,6 @@ import org.apache.activemq.store.PersistenceAdapter;
|
||||||
import org.apache.activemq.store.TopicMessageStore;
|
import org.apache.activemq.store.TopicMessageStore;
|
||||||
import org.apache.activemq.store.TransactionStore;
|
import org.apache.activemq.store.TransactionStore;
|
||||||
import org.apache.activemq.usage.SystemUsage;
|
import org.apache.activemq.usage.SystemUsage;
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Set;
|
|
||||||
/**
|
/**
|
||||||
* An implementation of {@link PersistenceAdapter} designed for use with a
|
* An implementation of {@link PersistenceAdapter} designed for use with a
|
||||||
* {@link Journal} and then check pointing asynchronously on a timeout with some
|
* {@link Journal} and then check pointing asynchronously on a timeout with some
|
||||||
|
@ -40,7 +40,7 @@ import java.util.Set;
|
||||||
* @version $Revision: 1.17 $
|
* @version $Revision: 1.17 $
|
||||||
*/
|
*/
|
||||||
public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
|
public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
|
||||||
private KahaDBStore letter = new KahaDBStore();
|
private final KahaDBStore letter = new KahaDBStore();
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -370,4 +370,19 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
||||||
public void setBrokerService(BrokerService brokerService) {
|
public void setBrokerService(BrokerService brokerService) {
|
||||||
letter.setBrokerService(brokerService);
|
letter.setBrokerService(brokerService);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the archiveDataLogs
|
||||||
|
*/
|
||||||
|
public boolean isArchiveDataLogs() {
|
||||||
|
return letter.isArchiveDataLogs();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param archiveDataLogs the archiveDataLogs to set
|
||||||
|
*/
|
||||||
|
public void setArchiveDataLogs(boolean archiveDataLogs) {
|
||||||
|
letter.setArchiveDataLogs(archiveDataLogs);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,7 +35,6 @@ import java.util.TreeMap;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.BrokerServiceAware;
|
import org.apache.activemq.broker.BrokerServiceAware;
|
||||||
import org.apache.activemq.command.ConnectionId;
|
import org.apache.activemq.command.ConnectionId;
|
||||||
|
@ -77,7 +76,6 @@ import org.apache.kahadb.util.Sequence;
|
||||||
import org.apache.kahadb.util.SequenceSet;
|
import org.apache.kahadb.util.SequenceSet;
|
||||||
import org.apache.kahadb.util.StringMarshaller;
|
import org.apache.kahadb.util.StringMarshaller;
|
||||||
import org.apache.kahadb.util.VariableMarshaller;
|
import org.apache.kahadb.util.VariableMarshaller;
|
||||||
import org.springframework.core.enums.LetterCodedLabeledEnum;
|
|
||||||
|
|
||||||
public class MessageDatabase implements BrokerServiceAware {
|
public class MessageDatabase implements BrokerServiceAware {
|
||||||
|
|
||||||
|
@ -159,6 +157,7 @@ public class MessageDatabase implements BrokerServiceAware {
|
||||||
protected File directory;
|
protected File directory;
|
||||||
protected Thread checkpointThread;
|
protected Thread checkpointThread;
|
||||||
protected boolean enableJournalDiskSyncs=true;
|
protected boolean enableJournalDiskSyncs=true;
|
||||||
|
protected boolean archiveDataLogs;
|
||||||
long checkpointInterval = 5*1000;
|
long checkpointInterval = 5*1000;
|
||||||
long cleanupInterval = 30*1000;
|
long cleanupInterval = 30*1000;
|
||||||
int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
|
int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
|
||||||
|
@ -173,6 +172,7 @@ public class MessageDatabase implements BrokerServiceAware {
|
||||||
private int indexCacheSize = 100;
|
private int indexCacheSize = 100;
|
||||||
private boolean checkForCorruptJournalFiles = false;
|
private boolean checkForCorruptJournalFiles = false;
|
||||||
private boolean checksumJournalFiles = false;
|
private boolean checksumJournalFiles = false;
|
||||||
|
|
||||||
|
|
||||||
public MessageDatabase() {
|
public MessageDatabase() {
|
||||||
}
|
}
|
||||||
|
@ -234,6 +234,7 @@ public class MessageDatabase implements BrokerServiceAware {
|
||||||
|
|
||||||
private void startCheckpoint() {
|
private void startCheckpoint() {
|
||||||
checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
|
checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
long lastCleanup = System.currentTimeMillis();
|
long lastCleanup = System.currentTimeMillis();
|
||||||
|
@ -510,6 +511,7 @@ public class MessageDatabase implements BrokerServiceAware {
|
||||||
|
|
||||||
final ArrayList<Long> matches = new ArrayList<Long>();
|
final ArrayList<Long> matches = new ArrayList<Long>();
|
||||||
sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
|
sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
|
||||||
|
@Override
|
||||||
protected void matched(Location key, Long value) {
|
protected void matched(Location key, Long value) {
|
||||||
matches.add(value);
|
matches.add(value);
|
||||||
}
|
}
|
||||||
|
@ -1375,6 +1377,7 @@ public class MessageDatabase implements BrokerServiceAware {
|
||||||
this.command = command;
|
this.command = command;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void execute(Transaction tx) throws IOException {
|
public void execute(Transaction tx) throws IOException {
|
||||||
upadateIndex(tx, command, location);
|
upadateIndex(tx, command, location);
|
||||||
}
|
}
|
||||||
|
@ -1392,6 +1395,7 @@ public class MessageDatabase implements BrokerServiceAware {
|
||||||
this.command = command;
|
this.command = command;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void execute(Transaction tx) throws IOException {
|
public void execute(Transaction tx) throws IOException {
|
||||||
updateIndex(tx, command, location);
|
updateIndex(tx, command, location);
|
||||||
}
|
}
|
||||||
|
@ -1420,6 +1424,7 @@ public class MessageDatabase implements BrokerServiceAware {
|
||||||
manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
|
manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
|
||||||
manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
|
manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
|
||||||
manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
|
manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
|
||||||
|
manager.setArchiveDataLogs(isArchiveDataLogs());
|
||||||
return manager;
|
return manager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1552,4 +1557,18 @@ public class MessageDatabase implements BrokerServiceAware {
|
||||||
public void setBrokerService(BrokerService brokerService) {
|
public void setBrokerService(BrokerService brokerService) {
|
||||||
this.brokerService = brokerService;
|
this.brokerService = brokerService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the archiveDataLogs
|
||||||
|
*/
|
||||||
|
public boolean isArchiveDataLogs() {
|
||||||
|
return this.archiveDataLogs;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param archiveDataLogs the archiveDataLogs to set
|
||||||
|
*/
|
||||||
|
public void setArchiveDataLogs(boolean archiveDataLogs) {
|
||||||
|
this.archiveDataLogs = archiveDataLogs;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue