mirror of https://github.com/apache/activemq.git
expose journal writeBatchSize
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@813467 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ce4c065f75
commit
561cda1e45
|
@ -38,8 +38,6 @@ import org.apache.kahadb.util.LinkedNodeList;
|
||||||
*/
|
*/
|
||||||
class DataFileAppender {
|
class DataFileAppender {
|
||||||
|
|
||||||
protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4;
|
|
||||||
|
|
||||||
protected final Journal journal;
|
protected final Journal journal;
|
||||||
protected final Map<WriteKey, WriteCommand> inflightWrites;
|
protected final Map<WriteKey, WriteCommand> inflightWrites;
|
||||||
protected final Object enqueueMutex = new Object() {
|
protected final Object enqueueMutex = new Object() {
|
||||||
|
@ -49,7 +47,7 @@ class DataFileAppender {
|
||||||
protected boolean shutdown;
|
protected boolean shutdown;
|
||||||
protected IOException firstAsyncException;
|
protected IOException firstAsyncException;
|
||||||
protected final CountDownLatch shutdownDone = new CountDownLatch(1);
|
protected final CountDownLatch shutdownDone = new CountDownLatch(1);
|
||||||
protected int maxWriteBatchSize = DEFAULT_MAX_BATCH_SIZE;
|
protected int maxWriteBatchSize;
|
||||||
|
|
||||||
private boolean running;
|
private boolean running;
|
||||||
private Thread thread;
|
private Thread thread;
|
||||||
|
@ -145,6 +143,7 @@ class DataFileAppender {
|
||||||
public DataFileAppender(Journal dataManager) {
|
public DataFileAppender(Journal dataManager) {
|
||||||
this.journal = dataManager;
|
this.journal = dataManager;
|
||||||
this.inflightWrites = this.journal.getInflightWrites();
|
this.inflightWrites = this.journal.getInflightWrites();
|
||||||
|
this.maxWriteBatchSize = this.journal.getWriteBatchSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -73,7 +73,8 @@ public class Journal {
|
||||||
public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
|
public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
|
||||||
public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
|
public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
|
||||||
public static final int PREFERED_DIFF = 1024 * 512;
|
public static final int PREFERED_DIFF = 1024 * 512;
|
||||||
|
public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(Journal.class);
|
private static final Log LOG = LogFactory.getLog(Journal.class);
|
||||||
|
|
||||||
protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
|
protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
|
||||||
|
@ -86,7 +87,8 @@ public class Journal {
|
||||||
|
|
||||||
protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
|
protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
|
||||||
protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
|
protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
|
||||||
|
protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
|
||||||
|
|
||||||
protected DataFileAppender appender;
|
protected DataFileAppender appender;
|
||||||
protected DataFileAccessorPool accessorPool;
|
protected DataFileAccessorPool accessorPool;
|
||||||
|
|
||||||
|
@ -102,6 +104,8 @@ public class Journal {
|
||||||
protected boolean checksum;
|
protected boolean checksum;
|
||||||
protected boolean checkForCorruptionOnStartup;
|
protected boolean checkForCorruptionOnStartup;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public synchronized void start() throws IOException {
|
public synchronized void start() throws IOException {
|
||||||
if (started) {
|
if (started) {
|
||||||
return;
|
return;
|
||||||
|
@ -175,9 +179,6 @@ public class Journal {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Location recoveryCheck(DataFile dataFile) throws IOException {
|
protected Location recoveryCheck(DataFile dataFile) throws IOException {
|
||||||
byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
|
|
||||||
DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);
|
|
||||||
|
|
||||||
Location location = new Location();
|
Location location = new Location();
|
||||||
location.setDataFileId(dataFile.getDataFileId());
|
location.setDataFileId(dataFile.getDataFileId());
|
||||||
location.setOffset(0);
|
location.setOffset(0);
|
||||||
|
@ -707,4 +708,12 @@ public class Journal {
|
||||||
public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) {
|
public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) {
|
||||||
this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
|
this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setWriteBatchSize(int writeBatchSize) {
|
||||||
|
this.writeBatchSize = writeBatchSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getWriteBatchSize() {
|
||||||
|
return writeBatchSize;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue