mirror of https://github.com/apache/activemq.git
Added the ability to customize thw write batch size in the PageFile.
Added some diagnostic logging to see when store updates take too long and why. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@741137 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3c543a0cd0
commit
19c4316a1c
|
@ -415,6 +415,7 @@ public class MessageDatabase {
|
||||||
|
|
||||||
protected void checkpointCleanup(final boolean cleanup) {
|
protected void checkpointCleanup(final boolean cleanup) {
|
||||||
try {
|
try {
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
synchronized (indexMutex) {
|
synchronized (indexMutex) {
|
||||||
if( !opened.get() ) {
|
if( !opened.get() ) {
|
||||||
return;
|
return;
|
||||||
|
@ -425,6 +426,10 @@ public class MessageDatabase {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
long end = System.currentTimeMillis();
|
||||||
|
if( end-start > 100 ) {
|
||||||
|
LOG.warn("KahaDB Cleanup took "+(end-start));
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
@ -457,12 +462,22 @@ public class MessageDatabase {
|
||||||
* durring a recovery process.
|
* durring a recovery process.
|
||||||
*/
|
*/
|
||||||
public Location store(JournalCommand data, boolean sync) throws IOException {
|
public Location store(JournalCommand data, boolean sync) throws IOException {
|
||||||
|
|
||||||
|
|
||||||
int size = data.serializedSizeFramed();
|
int size = data.serializedSizeFramed();
|
||||||
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
|
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
|
||||||
os.writeByte(data.type().getNumber());
|
os.writeByte(data.type().getNumber());
|
||||||
data.writeFramed(os);
|
data.writeFramed(os);
|
||||||
|
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
Location location = journal.write(os.toByteSequence(), sync);
|
Location location = journal.write(os.toByteSequence(), sync);
|
||||||
|
long start2 = System.currentTimeMillis();
|
||||||
process(data, location);
|
process(data, location);
|
||||||
|
long end = System.currentTimeMillis();
|
||||||
|
if( end-start > 100 ) {
|
||||||
|
LOG.warn("KahaDB long enqueue time: Journal Add Took: "+(start2-start)+" ms, Index Update took "+(end-start2)+" ms");
|
||||||
|
}
|
||||||
|
|
||||||
metadata.lastUpdate = location;
|
metadata.lastUpdate = location;
|
||||||
return location;
|
return location;
|
||||||
}
|
}
|
||||||
|
|
|
@ -145,6 +145,7 @@ public class VerifySteadyEnqueueRate extends TestCase {
|
||||||
KahaDBStore kaha = new KahaDBStore();
|
KahaDBStore kaha = new KahaDBStore();
|
||||||
kaha.setDirectory(new File("target/activemq-data/kahadb"));
|
kaha.setDirectory(new File("target/activemq-data/kahadb"));
|
||||||
kaha.deleteAllMessages();
|
kaha.deleteAllMessages();
|
||||||
|
kaha.getPageFile().setWriteBatchSize(10);
|
||||||
broker.setPersistenceAdapter(kaha);
|
broker.setPersistenceAdapter(kaha);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -104,6 +104,9 @@ public class PageFile {
|
||||||
private int recoveryPageCount;
|
private int recoveryPageCount;
|
||||||
|
|
||||||
private AtomicBoolean loaded = new AtomicBoolean();
|
private AtomicBoolean loaded = new AtomicBoolean();
|
||||||
|
// The number of pages we are aiming to write every time we
|
||||||
|
// write to disk.
|
||||||
|
int writeBatchSize = 1000;
|
||||||
|
|
||||||
// We keep a cache of pages recently used?
|
// We keep a cache of pages recently used?
|
||||||
private LRUCache<Long, Page> pageCache;
|
private LRUCache<Long, Page> pageCache;
|
||||||
|
@ -824,7 +827,7 @@ public class PageFile {
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean canStartWriteBatch() {
|
private boolean canStartWriteBatch() {
|
||||||
int capacityUsed = ((writes.size() * 100)/1000);
|
int capacityUsed = ((writes.size() * 100)/writeBatchSize);
|
||||||
if( enableAsyncWrites ) {
|
if( enableAsyncWrites ) {
|
||||||
// The constant 10 here controls how soon write batches start going to disk..
|
// The constant 10 here controls how soon write batches start going to disk..
|
||||||
// would be nice to figure out how to auto tune that value. Make to small and
|
// would be nice to figure out how to auto tune that value. Make to small and
|
||||||
|
@ -1099,4 +1102,12 @@ public class PageFile {
|
||||||
return getMainPageFile();
|
return getMainPageFile();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getWriteBatchSize() {
|
||||||
|
return writeBatchSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setWriteBatchSize(int writeBatchSize) {
|
||||||
|
this.writeBatchSize = writeBatchSize;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue