Bettter property names.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@741169 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2009-02-05 16:33:23 +00:00
parent 19c4316a1c
commit c0594251e2
4 changed files with 59 additions and 53 deletions

View File

@ -140,7 +140,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isSyncWrites() && message.isResponseRequired()); store(command, isEnableJournalDiskSyncs() && message.isResponseRequired());
} }
@ -149,7 +149,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
command.setDestination(dest); command.setDestination(dest);
command.setMessageId(ack.getLastMessageId().toString()); command.setMessageId(ack.getLastMessageId().toString());
command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()) ); command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()) );
store(command, isSyncWrites() && ack.isResponseRequired()); store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired());
} }
public void removeAllMessages(ConnectionContext context) throws IOException { public void removeAllMessages(ConnectionContext context) throws IOException {
@ -282,14 +282,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
command.setRetroactive(retroactive); command.setRetroactive(retroactive);
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isSyncWrites() && true); store(command, isEnableJournalDiskSyncs() && true);
} }
public void deleteSubscription(String clientId, String subscriptionName) throws IOException { public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
KahaSubscriptionCommand command = new KahaSubscriptionCommand(); KahaSubscriptionCommand command = new KahaSubscriptionCommand();
command.setDestination(dest); command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
store(command, isSyncWrites() && true); store(command, isEnableJournalDiskSyncs() && true);
} }
public SubscriptionInfo[] getAllSubscriptions() throws IOException { public SubscriptionInfo[] getAllSubscriptions() throws IOException {

View File

@ -146,9 +146,9 @@ public class MessageDatabase {
protected boolean deleteAllMessages; protected boolean deleteAllMessages;
protected File directory; protected File directory;
protected Thread checkpointThread; protected Thread checkpointThread;
protected boolean syncWrites=true; protected boolean enableJournalDiskSyncs=true;
int checkpointInterval = 5*1000; long checkpointInterval = 5*1000;
int cleanupInterval = 30*1000; long cleanupInterval = 30*1000;
protected AtomicBoolean started = new AtomicBoolean(); protected AtomicBoolean started = new AtomicBoolean();
protected AtomicBoolean opened = new AtomicBoolean(); protected AtomicBoolean opened = new AtomicBoolean();
@ -1182,9 +1182,7 @@ public class MessageDatabase {
// ///////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////
private PageFile createPageFile() { private PageFile createPageFile() {
PageFile pf = new PageFile(directory, "db"); return new PageFile(directory, "db");
pf.setEnableAsyncWrites(!isSyncWrites());
return pf;
} }
private Journal createJournal() { private Journal createJournal() {
@ -1211,27 +1209,27 @@ public class MessageDatabase {
this.deleteAllMessages = deleteAllMessages; this.deleteAllMessages = deleteAllMessages;
} }
public boolean isSyncWrites() { public boolean isEnableJournalDiskSyncs() {
return syncWrites; return enableJournalDiskSyncs;
} }
public void setSyncWrites(boolean syncWrites) { public void setEnableJournalDiskSyncs(boolean syncWrites) {
this.syncWrites = syncWrites; this.enableJournalDiskSyncs = syncWrites;
} }
public int getCheckpointInterval() { public long getCheckpointInterval() {
return checkpointInterval; return checkpointInterval;
} }
public void setCheckpointInterval(int checkpointInterval) { public void setCheckpointInterval(long checkpointInterval) {
this.checkpointInterval = checkpointInterval; this.checkpointInterval = checkpointInterval;
} }
public int getCleanupInterval() { public long getCleanupInterval() {
return cleanupInterval; return cleanupInterval;
} }
public void setCleanupInterval(int cleanupInterval) { public void setCleanupInterval(long cleanupInterval) {
this.cleanupInterval = cleanupInterval; this.cleanupInterval = cleanupInterval;
} }

View File

@ -65,6 +65,8 @@ public class VerifySteadyEnqueueRate extends TestCase {
private void doTestEnqueue(final boolean transacted) throws Exception { private void doTestEnqueue(final boolean transacted) throws Exception {
final long min = 100; final long min = 100;
final AtomicLong total = new AtomicLong(0);
final AtomicLong slaViolations = new AtomicLong(0);
final AtomicLong max = new AtomicLong(0); final AtomicLong max = new AtomicLong(0);
long reportTime = 0; long reportTime = 0;
@ -81,16 +83,20 @@ public class VerifySteadyEnqueueRate extends TestCase {
long endT = System.currentTimeMillis(); long endT = System.currentTimeMillis();
long duration = endT - startT; long duration = endT - startT;
total.incrementAndGet();
if (duration > max.get()) { if (duration > max.get()) {
max.set(duration); max.set(duration);
} }
if (duration > min) { if (duration > min) {
System.err.println(Thread.currentThread().getName() slaViolations.incrementAndGet();
System.err.println("SLA violation @ "+Thread.currentThread().getName()
+ " " + " "
+ DateFormat.getTimeInstance().format( + DateFormat.getTimeInstance().format(
new Date(startT)) + " at message " new Date(startT)) + " at message "
+ i + " send time=" + duration); + i + " send time=" + duration
+ " - Total SLA violations: "+slaViolations.get()+"/"+total.get()+" ("+String.format("%.6f", 100.0*slaViolations.get()/total.get())+"%)");
} }
} }
@ -145,7 +151,13 @@ public class VerifySteadyEnqueueRate extends TestCase {
KahaDBStore kaha = new KahaDBStore(); KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(new File("target/activemq-data/kahadb")); kaha.setDirectory(new File("target/activemq-data/kahadb"));
kaha.deleteAllMessages(); kaha.deleteAllMessages();
kaha.getPageFile().setWriteBatchSize(10); kaha.setCleanupInterval(1000 * 60 * 60 * 60);
// The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified
// what happens if the index is updated but a journal update is lost.
// Index is going to be in consistent, but can it be repaired?
kaha.setEnableJournalDiskSyncs(false);
kaha.getPageFile().setWriteBatchSize(100);
kaha.getPageFile().setEnableWriteThread(true);
broker.setPersistenceAdapter(kaha); broker.setPersistenceAdapter(kaha);
} }

View File

@ -26,12 +26,9 @@ import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
@ -39,7 +36,6 @@ import java.util.Properties;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Adler32; import java.util.zip.Adler32;
@ -48,7 +44,6 @@ import java.util.zip.Checksum;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.util.DataByteArrayOutputStream; import org.apache.kahadb.util.DataByteArrayOutputStream;
import org.apache.kahadb.util.IOExceptionSupport;
import org.apache.kahadb.util.IOHelper; import org.apache.kahadb.util.IOHelper;
import org.apache.kahadb.util.IntrospectionSupport; import org.apache.kahadb.util.IntrospectionSupport;
import org.apache.kahadb.util.LRUCache; import org.apache.kahadb.util.LRUCache;
@ -119,9 +114,9 @@ public class PageFile {
// page write failures.. // page write failures..
private boolean enableRecoveryFile=true; private boolean enableRecoveryFile=true;
// Will we sync writes to disk. Ensures that data will not be lost after a checkpoint() // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint()
private boolean enableSyncedWrites=true; private boolean enableDiskSyncs=true;
// Will writes be done in an async thread? // Will writes be done in an async thread?
private boolean enableAsyncWrites=false; private boolean enabledWriteThread=false;
// These are used if enableAsyncWrites==true // These are used if enableAsyncWrites==true
private AtomicBoolean stopWriter = new AtomicBoolean(); private AtomicBoolean stopWriter = new AtomicBoolean();
@ -427,7 +422,7 @@ public class PageFile {
*/ */
public void flush() throws IOException { public void flush() throws IOException {
if( enableAsyncWrites && stopWriter.get() ) { if( enabledWriteThread && stopWriter.get() ) {
throw new IOException("Page file already stopped: checkpointing is not allowed"); throw new IOException("Page file already stopped: checkpointing is not allowed");
} }
@ -437,7 +432,7 @@ public class PageFile {
if( writes.isEmpty()) { if( writes.isEmpty()) {
return; return;
} }
if( enableAsyncWrites ) { if( enabledWriteThread ) {
if( this.checkpointLatch == null ) { if( this.checkpointLatch == null ) {
this.checkpointLatch = new CountDownLatch(1); this.checkpointLatch = new CountDownLatch(1);
} }
@ -591,17 +586,17 @@ public class PageFile {
/** /**
* @return Are page writes synced to disk? * @return Are page writes synced to disk?
*/ */
public boolean isEnableSyncedWrites() { public boolean isEnableDiskSyncs() {
return enableSyncedWrites; return enableDiskSyncs;
} }
/** /**
* Allows you enable syncing writes to disk. * Allows you enable syncing writes to disk.
* @param syncWrites * @param syncWrites
*/ */
public void setEnableSyncedWrites(boolean syncWrites) { public void setEnableDiskSyncs(boolean syncWrites) {
assertNotLoaded(); assertNotLoaded();
this.enableSyncedWrites = syncWrites; this.enableDiskSyncs = syncWrites;
} }
/** /**
@ -662,13 +657,13 @@ public class PageFile {
this.pageCacheSize = pageCacheSize; this.pageCacheSize = pageCacheSize;
} }
public boolean isEnableAsyncWrites() { public boolean isEnabledWriteThread() {
return enableAsyncWrites; return enabledWriteThread;
} }
public void setEnableAsyncWrites(boolean enableAsyncWrites) { public void setEnableWriteThread(boolean enableAsyncWrites) {
assertNotLoaded(); assertNotLoaded();
this.enableAsyncWrites = enableAsyncWrites; this.enabledWriteThread = enableAsyncWrites;
} }
public long getDiskSize() throws IOException { public long getDiskSize() throws IOException {
@ -700,6 +695,15 @@ public class PageFile {
this.recoveryFileMaxPageCount = recoveryFileMaxPageCount; this.recoveryFileMaxPageCount = recoveryFileMaxPageCount;
} }
public int getWriteBatchSize() {
return writeBatchSize;
}
public void setWriteBatchSize(int writeBatchSize) {
assertNotLoaded();
this.writeBatchSize = writeBatchSize;
}
/////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////
// Package Protected Methods exposed to Transaction // Package Protected Methods exposed to Transaction
/////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////
@ -817,7 +821,7 @@ public class PageFile {
// Once we start approaching capacity, notify the writer to start writing // Once we start approaching capacity, notify the writer to start writing
if( canStartWriteBatch() ) { if( canStartWriteBatch() ) {
if( enableAsyncWrites ) { if( enabledWriteThread ) {
writes.notify(); writes.notify();
} else { } else {
writeBatch(); writeBatch();
@ -828,7 +832,7 @@ public class PageFile {
private boolean canStartWriteBatch() { private boolean canStartWriteBatch() {
int capacityUsed = ((writes.size() * 100)/writeBatchSize); int capacityUsed = ((writes.size() * 100)/writeBatchSize);
if( enableAsyncWrites ) { if( enabledWriteThread ) {
// The constant 10 here controls how soon write batches start going to disk.. // The constant 10 here controls how soon write batches start going to disk..
// would be nice to figure out how to auto tune that value. Make to small and // would be nice to figure out how to auto tune that value. Make to small and
// we reduce through put because we are locking the write mutex too often doing writes // we reduce through put because we are locking the write mutex too often doing writes
@ -963,7 +967,7 @@ public class PageFile {
recoveryFile.write(w.diskBound, 0, pageSize); recoveryFile.write(w.diskBound, 0, pageSize);
} }
if (enableSyncedWrites) { if (enableDiskSyncs) {
// Sync to make sure recovery buffer writes land on disk.. // Sync to make sure recovery buffer writes land on disk..
recoveryFile.getFD().sync(); recoveryFile.getFD().sync();
} }
@ -978,7 +982,7 @@ public class PageFile {
} }
// Sync again // Sync again
if( enableSyncedWrites ) { if( enableDiskSyncs ) {
writeFile.getFD().sync(); writeFile.getFD().sync();
} }
@ -1077,7 +1081,7 @@ public class PageFile {
private void startWriter() { private void startWriter() {
synchronized( writes ) { synchronized( writes ) {
if( enableAsyncWrites ) { if( enabledWriteThread ) {
stopWriter.set(false); stopWriter.set(false);
writerThread = new Thread("KahaDB Page Writer") { writerThread = new Thread("KahaDB Page Writer") {
@Override @Override
@ -1092,7 +1096,7 @@ public class PageFile {
} }
private void stopWriter() throws InterruptedException { private void stopWriter() throws InterruptedException {
if( enableAsyncWrites ) { if( enabledWriteThread ) {
stopWriter.set(true); stopWriter.set(true);
writerThread.join(); writerThread.join();
} }
@ -1102,12 +1106,4 @@ public class PageFile {
return getMainPageFile(); return getMainPageFile();
} }
public int getWriteBatchSize() {
return writeBatchSize;
}
public void setWriteBatchSize(int writeBatchSize) {
this.writeBatchSize = writeBatchSize;
}
} }