ARTEMIS-319 Improving files allocation and implementing journal-pool-files

https://issues.apache.org/jira/browse/ARTEMIS-319
This commit is contained in:
Clebert Suconic 2015-11-24 19:02:42 -05:00
parent 0b22433cc5
commit 351bcfc9f9
46 changed files with 1199 additions and 199 deletions

View File

@ -53,7 +53,7 @@ public final class CompactJournal extends LockAbstract {
final IOCriticalErrorListener listener) throws Exception {
NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1);
JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
JournalImpl journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
journal.start();

View File

@ -107,7 +107,7 @@ public class DecodeJournal extends LockAbstract {
NIOSequentialFileFactory nio = new NIOSequentialFileFactory(new File(directory), null, 1);
JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
JournalImpl journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
if (journal.orderFiles().size() != 0) {
throw new IllegalStateException("Import needs to create a brand new journal");

View File

@ -101,7 +101,7 @@ public class EncodeJournal extends LockAbstract {
final PrintStream out) throws Exception {
NIOSequentialFileFactory nio = new NIOSequentialFileFactory(new File(directory), null, 1);
JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
JournalImpl journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
List<JournalFile> files = journal.orderFiles();

View File

@ -310,7 +310,7 @@ public final class XmlDataExporter extends LockAbstract {
private void getJmsBindings() throws Exception {
SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1);
Journal jmsJournal = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1);
Journal jmsJournal = new JournalImpl(1024 * 1024, 2, 2, config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1);
jmsJournal.start();

View File

@ -43,7 +43,9 @@ under the License.
<large-messages-directory>${data.dir}/large-messages</large-messages-directory>
<journal-min-files>10</journal-min-files>
<journal-min-files>2</journal-min-files>
<journal-pool-files>-1</journal-pool-files>
${journal-buffer.settings}
${connector-config.settings}
<acceptors>

View File

@ -99,6 +99,7 @@ public final class ActiveMQDefaultConfiguration {
// These defaults are applied depending on whether the journal type
// is NIO or AIO.
private static int DEFAULT_JOURNAL_MAX_IO_AIO = 500;
private static int DEFAULT_JOURNAL_POOL_FILES = -1;
private static int DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO = ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO;
private static int DEFAULT_JOURNAL_BUFFER_SIZE_AIO = ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO;
private static int DEFAULT_JOURNAL_MAX_IO_NIO = 1;
@ -679,6 +680,14 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_JOURNAL_MIN_FILES;
}
/**
* How many journal files can be resued
* @return
*/
public static int getDefaultJournalPoolFiles() {
return DEFAULT_JOURNAL_POOL_FILES;
}
/**
* The percentage of live data on which we consider compacting the journal
*/

View File

@ -86,7 +86,7 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager {
SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1);
Journal localJMS = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1);
Journal localJMS = new JournalImpl(1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1);
if (replicator != null) {
jmsJournal = new ReplicatedJournal((byte) 2, localJMS, replicator);

View File

@ -79,9 +79,10 @@ public class AIOSequentialFile extends AbstractSequentialFile {
@Override
public int getAlignment() {
checkOpened();
return aioFile.getBlockSize();
// TODO: get the alignment from the file system, but we have to cache this, we can't call it every time
/* checkOpened();
return aioFile.getBlockSize(); */
return 512;
}
@Override

View File

@ -73,6 +73,8 @@ public class JournalFilesRepository {
private final int minFiles;
private final int poolSize;
private final int fileSize;
private final String filePrefix;
@ -104,7 +106,8 @@ public class JournalFilesRepository {
final int userVersion,
final int maxAIO,
final int fileSize,
final int minFiles) {
final int minFiles,
final int poolSize) {
if (filePrefix == null) {
throw new IllegalArgumentException("filePrefix cannot be null");
}
@ -120,6 +123,7 @@ public class JournalFilesRepository {
this.fileExtension = fileExtension;
this.minFiles = minFiles;
this.fileSize = fileSize;
this.poolSize = poolSize;
this.userVersion = userVersion;
this.journal = journal;
}
@ -358,7 +362,7 @@ public class JournalFilesRepository {
ActiveMQJournalLogger.LOGGER.deletingFile(file);
file.getFile().delete();
}
else if (!checkDelete || (freeFilesCount.get() + dataFiles.size() + 1 + openedFiles.size() < minFiles)) {
else if (!checkDelete || (freeFilesCount.get() + dataFiles.size() + 1 + openedFiles.size() < poolSize) || (poolSize < 0)) {
// Re-initialise it
if (JournalFilesRepository.trace) {
@ -378,7 +382,7 @@ public class JournalFilesRepository {
if (trace) {
ActiveMQJournalLogger.LOGGER.trace("DataFiles.size() = " + dataFiles.size());
ActiveMQJournalLogger.LOGGER.trace("openedFiles.size() = " + openedFiles.size());
ActiveMQJournalLogger.LOGGER.trace("minfiles = " + minFiles);
ActiveMQJournalLogger.LOGGER.trace("minfiles = " + minFiles + ", poolSize = " + poolSize);
ActiveMQJournalLogger.LOGGER.trace("Free Files = " + freeFilesCount.get());
ActiveMQJournalLogger.LOGGER.trace("File " + file +
" being deleted as freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() (" +

View File

@ -40,21 +40,20 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.TestableJournal;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
@ -193,7 +192,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// Lock used during the append of records
// This lock doesn't represent a global lock.
// After a record is appended, the usedFile can't be changed until the positives and negatives are updated
private final ReentrantLock lockAppend = new ReentrantLock();
private final Object lockAppend = new Object();
/**
* We don't lock the journal during the whole compacting operation. During compacting we only
@ -209,23 +208,27 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
private volatile JournalState state = JournalState.STOPPED;
private volatile int compactCount = 0;
private final Reclaimer reclaimer = new Reclaimer();
// Constructors --------------------------------------------------
public JournalImpl(final int fileSize,
final int minFiles,
final int poolSize,
final int compactMinFiles,
final int compactPercentage,
final SequentialFileFactory fileFactory,
final String filePrefix,
final String fileExtension,
final int maxAIO) {
this(fileSize, minFiles, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, 0);
this(fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, 0);
}
public JournalImpl(final int fileSize,
final int minFiles,
final int poolSize,
final int compactMinFiles,
final int compactPercentage,
final SequentialFileFactory fileFactory,
@ -234,6 +237,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final int maxAIO,
final int userVersion) {
super(fileFactory.isSupportsCallbacks(), fileSize);
if (fileSize % fileFactory.getAlignment() != 0) {
throw new IllegalArgumentException("Invalid journal-file-size " + fileSize + ", It should be multiple of " +
fileFactory.getAlignment());
@ -257,7 +261,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
this.fileFactory = fileFactory;
filesRepository = new JournalFilesRepository(fileFactory, this, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles);
filesRepository = new JournalFilesRepository(fileFactory, this, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles, poolSize);
this.userVersion = userVersion;
}
@ -715,8 +719,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
callback.storeLineUp();
}
lockAppend.lock();
try {
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
if (JournalImpl.TRACE_RECORDS) {
@ -729,9 +732,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
}
finally {
lockAppend.unlock();
}
}
finally {
journalLock.readLock().unlock();
@ -763,8 +763,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
callback.storeLineUp();
}
lockAppend.lock();
try {
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
if (JournalImpl.TRACE_RECORDS) {
@ -784,9 +783,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
}
}
finally {
lockAppend.unlock();
}
}
finally {
journalLock.readLock().unlock();
@ -821,8 +817,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
callback.storeLineUp();
}
lockAppend.lock();
try {
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback);
if (JournalImpl.TRACE_RECORDS) {
@ -839,9 +834,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
finally {
lockAppend.unlock();
}
}
finally {
journalLock.readLock().unlock();
@ -862,8 +854,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
JournalTransaction tx = getTransactionInfo(txID);
lockAppend.lock();
try {
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
if (JournalImpl.TRACE_RECORDS) {
@ -878,9 +869,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.addPositive(usedFile, id, addRecord.getEncodeSize());
}
finally {
lockAppend.unlock();
}
}
finally {
journalLock.readLock().unlock();
@ -911,8 +899,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
JournalTransaction tx = getTransactionInfo(txID);
lockAppend.lock();
try {
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null);
if (JournalImpl.TRACE_RECORDS) {
@ -927,9 +914,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize());
}
finally {
lockAppend.unlock();
}
}
finally {
journalLock.readLock().unlock();
@ -949,8 +933,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
JournalTransaction tx = getTransactionInfo(txID);
lockAppend.lock();
try {
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
if (JournalImpl.TRACE_RECORDS) {
@ -963,9 +946,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.addNegative(usedFile, id);
}
finally {
lockAppend.unlock();
}
}
finally {
journalLock.readLock().unlock();
@ -1003,8 +983,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
callback.storeLineUp();
}
lockAppend.lock();
try {
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
if (JournalImpl.TRACE_RECORDS) {
@ -1013,9 +992,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.prepare(usedFile);
}
finally {
lockAppend.unlock();
}
}
finally {
@ -1053,8 +1029,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
callback.storeLineUp();
}
lockAppend.lock();
try {
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
if (JournalImpl.TRACE_RECORDS) {
@ -1063,9 +1038,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.commit(usedFile);
}
finally {
lockAppend.unlock();
}
}
finally {
@ -1094,15 +1066,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
callback.storeLineUp();
}
lockAppend.lock();
try {
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
tx.rollback(usedFile);
}
finally {
lockAppend.unlock();
}
}
finally {
@ -1289,11 +1257,17 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
* Note: only synchronized methods on journal are methods responsible for the life-cycle such as
* stop, start records will still come as this is being executed
*/
public synchronized void compact() throws Exception {
if (compactor != null) {
throw new IllegalStateException("There is pending compacting operation");
}
if (ActiveMQJournalLogger.LOGGER.isDebugEnabled()) {
ActiveMQJournalLogger.LOGGER.debug("JournalImpl::compact compacting journal " + (++compactCount));
}
compactorLock.writeLock().lock();
try {
ArrayList<JournalFile> dataFilesToProcess = new ArrayList<>(filesRepository.getDataFilesCount());
@ -2067,14 +2041,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void forceMoveNextFile() throws Exception {
journalLock.readLock().lock();
try {
lockAppend.lock();
try {
synchronized (lockAppend) {
moveNextFile(false);
debugWait();
}
finally {
lockAppend.unlock();
}
}
finally {
journalLock.readLock().unlock();
@ -2131,9 +2101,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
journalLock.writeLock().lock();
try {
lockAppend.lock();
try {
synchronized (lockAppend) {
setJournalState(JournalState.STOPPED);
@ -2172,9 +2140,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
currentFile = null;
}
finally {
lockAppend.unlock();
}
}
finally {
journalLock.writeLock().unlock();
@ -2666,33 +2631,31 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
@Override
public void run() {
lockAppend.lock();
try {
synchronized (lockAppend) {
try {
final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]);
final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]);
JournalInternalRecord blastRecord = new JournalInternalRecord() {
JournalInternalRecord blastRecord = new JournalInternalRecord() {
@Override
public int getEncodeSize() {
return byteEncoder.getEncodeSize();
@Override
public int getEncodeSize() {
return byteEncoder.getEncodeSize();
}
@Override
public void encode(final ActiveMQBuffer buffer) {
byteEncoder.encode(buffer);
}
};
for (int i = 0; i < pages; i++) {
appendRecord(blastRecord, false, false, null, null);
}
@Override
public void encode(final ActiveMQBuffer buffer) {
byteEncoder.encode(buffer);
}
};
for (int i = 0; i < pages; i++) {
appendRecord(blastRecord, false, false, null, null);
}
}
catch (Exception e) {
ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e);
}
finally {
lockAppend.unlock();
catch (Exception e) {
ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e);
}
}
}
}
@ -2863,4 +2826,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
throw new RuntimeException(e);
}
}
/**
* For tests only
*/
public int getCompactCount() {
return compactCount;
}
}

BIN
artemis-native/bin/libartemis-native-64.so Executable file → Normal file

Binary file not shown.

View File

@ -63,6 +63,9 @@ struct io_control {
int dumbWriteHandler = 0;
char dumbPath[PATH_MAX];
#define ONE_MEGA 1048576l
void * oneMegaBuffer = 0;
jclass submitClass = NULL;
jmethodID errorMethod = NULL;
@ -121,6 +124,12 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) {
return JNI_ERR;
} else {
if (posix_memalign(&oneMegaBuffer, 512, ONE_MEGA) != 0)
{
fprintf(stderr, "Could not allocate the 1 Mega Buffer for initializing files\n");
return JNI_ERR;
}
memset(oneMegaBuffer, 0, ONE_MEGA);
sprintf (dumbPath, "%s/artemisJLHandler_XXXXXX", P_tmpdir);
dumbWriteHandler = mkstemp (dumbPath);
@ -219,6 +228,8 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
} else {
closeDumbHandlers();
free(oneMegaBuffer);
// delete global references so the GC can collect them
if (runtimeExceptionClass != NULL) {
(*env)->DeleteGlobalRef(env, runtimeExceptionClass);
@ -757,17 +768,34 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_fa
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_fill
(JNIEnv * env, jclass clazz, jint fd, jlong size)
{
void * preAllocBuffer = 0;
if (posix_memalign(&preAllocBuffer, 512, size) != 0)
int i;
int blocks = size / ONE_MEGA;
int rest = size % ONE_MEGA;
#ifdef DEBUG
fprintf (stderr, "blocks = %d, rest=%d\n", blocks, rest);
#endif
lseek (fd, 0, SEEK_SET);
for (i = 0; i < blocks; i++)
{
throwOutOfMemoryError(env);
return;
if (write(fd, oneMegaBuffer, ONE_MEGA) < 0)
{
throwIOException(env, "Cannot initialize file");
return;
}
}
if (rest != 0l)
{
if (write(fd, oneMegaBuffer, rest) < 0)
{
throwIOException(env, "Cannot initialize file");
return;
}
}
memset(preAllocBuffer, 0, size);
lseek (fd, 0, SEEK_SET);
write(fd, preAllocBuffer, size);
lseek (fd, 0, SEEK_SET);
free (preAllocBuffer);
}
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_memsetBuffer

View File

@ -49,7 +49,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
* <br>
* Or else the native module won't be loaded because of version mismatches
*/
private static final int EXPECTED_NATIVE_VERSION = 3;
private static final int EXPECTED_NATIVE_VERSION = 5;
private static boolean loaded = false;

View File

@ -48,9 +48,6 @@ public final class LibaioFile<Callback extends SubmitInfo> {
return LibaioContext.lock(fd);
}
/**
* {@inheritDoc}
*/
public void close() throws IOException {
open = false;
LibaioContext.close(fd);

View File

@ -93,12 +93,21 @@ public class LibaioTest {
}
@Test
public void testInitAndFallocate() throws Exception {
LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile("test.bin"), true);
fileDescriptor.fallocate(1024 * 1024);
public void testInitAndFallocate10M() throws Exception {
testInit(10 * 1024 * 1024);
}
ByteBuffer buffer = fileDescriptor.newBuffer(1024 * 1024);
fileDescriptor.read(0, 1024 * 1024, buffer, new TestInfo());
@Test
public void testInitAndFallocate10M100K() throws Exception {
testInit(10 * 1024 * 1024 + 100 * 1024);
}
private void testInit(int size) throws IOException {
LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile("test.bin"), true);
fileDescriptor.fallocate(size);
ByteBuffer buffer = fileDescriptor.newBuffer(size);
fileDescriptor.read(0, size, buffer, new TestInfo());
TestInfo[] callbacks = new TestInfo[1];
control.poll(callbacks, 1, 1);
@ -108,17 +117,27 @@ public class LibaioTest {
buffer.position(0);
LibaioFile fileDescriptor2 = control.openFile(temporaryFolder.newFile("test2.bin"), true);
fileDescriptor2.fill(1024 * 1024);
fileDescriptor2.read(0, 1024 * 1024, buffer, new TestInfo());
fileDescriptor2.fill(size);
fileDescriptor2.read(0, size, buffer, new TestInfo());
control.poll(callbacks, 1, 1);
for (int i = 0; i < 1024 * 1024; i++) {
for (int i = 0; i < size; i++) {
Assert.assertEquals(0, buffer.get());
}
LibaioContext.freeBuffer(buffer);
}
@Test
public void testInitAndFallocate10K() throws Exception {
testInit(10 * 1024);
}
@Test
public void testInitAndFallocate20K() throws Exception {
testInit(20 * 1024);
}
@Test
public void testSubmitWriteOnTwoFiles() throws Exception {

View File

@ -546,6 +546,13 @@ public interface Configuration {
*/
Configuration setJournalCompactMinFiles(int minFiles);
/** Number of files that would be acceptable to keep on a pool. Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_POOL_SIZE}.*/
int getJournalPoolFiles();
/** Number of files that would be acceptable to keep on a pool. Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_POOL_SIZE}.*/
Configuration setJournalPoolFiles(int poolSize);
/**
* Returns the percentage of live data before compacting the journal. <br>
* Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_COMPACT_PERCENTAGE}.

View File

@ -151,6 +151,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
protected int journalFileSize = ActiveMQDefaultConfiguration.getDefaultJournalFileSize();
protected int journalPoolFiles = ActiveMQDefaultConfiguration.getDefaultJournalPoolFiles();
protected int journalMinFiles = ActiveMQDefaultConfiguration.getDefaultJournalMinFiles();
// AIO and NIO need different values for these attributes
@ -669,6 +671,18 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
@Override
public int getJournalPoolFiles() {
return journalPoolFiles;
}
@Override
public Configuration setJournalPoolFiles(int poolSize) {
this.journalPoolFiles = poolSize;
return this;
}
@Override
public int getJournalMinFiles() {
return journalMinFiles;

View File

@ -470,6 +470,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setJournalMinFiles(getInteger(e, "journal-min-files", config.getJournalMinFiles(), Validators.GT_ZERO));
config.setJournalPoolFiles(getInteger(e, "journal-pool-files", config.getJournalPoolFiles(), Validators.MINUS_ONE_OR_GT_ZERO));
config.setJournalCompactMinFiles(getInteger(e, "journal-compact-min-files", config.getJournalCompactMinFiles(), Validators.GE_ZERO));
config.setJournalCompactPercentage(getInteger(e, "journal-compact-percentage", config.getJournalCompactPercentage(), Validators.PERCENTAGE));

View File

@ -106,7 +106,7 @@ public final class DescribeJournal {
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir, null, 1);
JournalImpl bindings = new JournalImpl(1024 * 1024, 2, -1, 0, bindingsFF, "activemq-bindings", "bindings", 1);
JournalImpl bindings = new JournalImpl(1024 * 1024, 2, 2, -1, 0, bindingsFF, "activemq-bindings", "bindings", 1);
describeJournal(bindingsFF, bindings, bindingsDir);
}
@ -117,7 +117,7 @@ public final class DescribeJournal {
// Will use only default values. The load function should adapt to anything different
ConfigurationImpl defaultValues = new ConfigurationImpl();
JournalImpl messagesJournal = new JournalImpl(defaultValues.getJournalFileSize(), defaultValues.getJournalMinFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
JournalImpl messagesJournal = new JournalImpl(defaultValues.getJournalFileSize(), defaultValues.getJournalMinFiles(), defaultValues.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
return describeJournal(messagesFF, messagesJournal, messagesDir);
}

View File

@ -231,7 +231,7 @@ public class JournalStorageManager implements StorageManager {
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
Journal localBindings = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1);
Journal localBindings = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1);
bindingsJournal = localBindings;
originalBindingsJournal = localBindings;
@ -255,7 +255,7 @@ public class JournalStorageManager implements StorageManager {
idGenerator = new BatchingIDGenerator(0, JournalStorageManager.CHECKPOINT_BATCH_SIZE, this);
Journal localMessage = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO());
Journal localMessage = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO());
messageJournal = localMessage;
originalMessageJournal = localMessage;

View File

@ -593,6 +593,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="journal-pool-files" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
how many journal files to pre-create
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="journal-compact-percentage" type="xsd:int" default="30" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -1630,7 +1630,7 @@ public abstract class ActiveMQTestBase extends Assert {
try {
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(new File(getJournalDir()), null, 1);
messagesJournal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
messagesJournal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
final List<RecordInfo> committedRecords = new LinkedList<>();
final List<PreparedTransactionInfo> preparedTransactions = new LinkedList<>();
@ -1664,7 +1664,7 @@ public abstract class ActiveMQTestBase extends Assert {
final HashMap<Integer, AtomicInteger> recordsType = new HashMap<>();
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(config.getJournalLocation(), null, 1);
JournalImpl messagesJournal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
JournalImpl messagesJournal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
List<JournalFile> filesToRead = messagesJournal.orderFiles();
for (JournalFile file : filesToRead) {
@ -1701,11 +1701,11 @@ public abstract class ActiveMQTestBase extends Assert {
if (messageJournal) {
ff = new NIOSequentialFileFactory(config.getJournalLocation(), null, 1);
journal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), 0, 0, ff, "activemq-data", "amq", 1);
journal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), 0, 0, ff, "activemq-data", "amq", 1);
}
else {
ff = new NIOSequentialFileFactory(config.getBindingsLocation(), null, 1);
journal = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), ff, "activemq-bindings", "bindings", 1);
journal = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), ff, "activemq-bindings", "bindings", 1);
}
journal.start();

View File

@ -54,6 +54,7 @@ Name | Description
[journal-file-size](persistence.md) | the size (in bytes) of each journal file. Default=10485760 (10 MB)
[journal-max-io](persistence.md#configuring.message.journal.journal-max-io) | the maximum number of write requests that can be in the AIO queue at any one time. Default is 500 for AIO and 1 for NIO.
[journal-min-files](persistence.md#configuring.message.journal.journal-min-files) | how many journal files to pre-create. Default=2
[journal-pool-files](persistence.md#configuring.message.journal.journal-pool-files) | -1 (default) means no Limit. The system will create as many files as needed however when reclaiming files it will shrink back to the `journal-pool-files`
[journal-sync-non-transactional](persistence.md) | if true wait for non transaction data to be synced to the journal before returning response to client. Default=true
[journal-sync-transactional](persistence.md) | if true wait for transaction data to be synchronized to the journal before returning response to client. Default=true
[journal-type](persistence.md) | the type of journal to use. Default=ASYNCIO

View File

@ -218,6 +218,18 @@ The message journal is configured using the following attributes in
steady state you should tune this number of files to match that
total amount of data.
- `journal-pool-files`
The system will create as many files as needed however when reclaiming files
it will shrink back to the `journal-pool-files`.
The default to this parameter is -1, meaning it will never delete files on the journal once created.
Notice that the system can't grow infinitely as you are still required to use paging for destinations that can
grow indefinitely.
Notice: in case you get too many files you can use [compacting](tools.md).
- `journal-max-io`
Write requests are queued up before being submitted to the system

View File

@ -47,24 +47,31 @@ OPTIONS
For a full list of data tools commands available use:
```
$ ./artemis help data
NAME
artemis data - data tools like (print|exp|imp|exp|encode|decode)
(example ./artemis data print)
artemis data - data tools group
(print|exp|imp|exp|encode|decode|compact) (example ./artemis data print)
SYNOPSIS
artemis data
artemis data decode [--prefix <prefix>] [--directory <directory>]
[--suffix <suffix>] [--file-size <size>]
artemis data encode [--prefix <prefix>] [--directory <directory>]
[--suffix <suffix>] [--file-size <size>]
artemis data exp [--bindings <binding>]
[--large-messages <largeMessges>] [--paging <paging>]
[--journal <journal>]
artemis data imp [--password <password>] [--port <port>] [--host <host>]
[--user <user>] [--transaction]
artemis data print [--bindings <binding>] [--paging <paging>]
[--journal <journal>]
artemis data compact [--broker <brokerConfig>] [--verbose]
[--paging <paging>] [--journal <journal>]
[--large-messages <largeMessges>] [--bindings <binding>]
artemis data decode [--broker <brokerConfig>] [--suffix <suffix>]
[--verbose] [--paging <paging>] [--prefix <prefix>] [--file-size <size>]
[--directory <directory>] --input <input> [--journal <journal>]
[--large-messages <largeMessges>] [--bindings <binding>]
artemis data encode [--directory <directory>] [--broker <brokerConfig>]
[--suffix <suffix>] [--verbose] [--paging <paging>] [--prefix <prefix>]
[--file-size <size>] [--journal <journal>]
[--large-messages <largeMessges>] [--bindings <binding>]
artemis data exp [--broker <brokerConfig>] [--verbose]
[--paging <paging>] [--journal <journal>]
[--large-messages <largeMessges>] [--bindings <binding>]
artemis data imp [--host <host>] [--verbose] [--port <port>]
[--password <password>] [--transaction] --input <input> [--user <user>]
artemis data print [--broker <brokerConfig>] [--verbose]
[--paging <paging>] [--journal <journal>]
[--large-messages <largeMessges>] [--bindings <binding>]
COMMANDS
With no arguments, Display help information
@ -73,73 +80,145 @@ COMMANDS
Print data records information (WARNING: don't use while a
production server is running)
With --bindings option, The folder used for bindings (default
../data/bindings)
With --broker option, This would override the broker configuration
from the bootstrap
With --paging option, The folder used for paging (default
../data/paging)
With --verbose option, Adds more information on the execution
With --paging option, The folder used for paging (default from
broker.xml)
With --journal option, The folder used for messages journal (default
../data/journal)
from broker.xml)
With --large-messages option, The folder used for large-messages
(default from broker.xml)
With --bindings option, The folder used for bindings (default from
broker.xml)
exp
Export all message-data using an XML that could be interpreted by
any system.
With --bindings option, The folder used for bindings (default
../data/bindings)
With --broker option, This would override the broker configuration
from the bootstrap
With --large-messages option, The folder used for large-messages
(default ../data/largemessages)
With --verbose option, Adds more information on the execution
With --paging option, The folder used for paging (default
../data/paging)
With --paging option, The folder used for paging (default from
broker.xml)
With --journal option, The folder used for messages journal (default
../data/journal)
from broker.xml)
With --large-messages option, The folder used for large-messages
(default from broker.xml)
With --bindings option, The folder used for bindings (default from
broker.xml)
imp
Import all message-data using an XML that could be interpreted by
any system.
With --password option, User name used to import the data. (default
null)
With --port option, The port used to import the data (default 61616)
With --host option, The host used to import the data (default
localhost)
With --user option, User name used to import the data. (default
With --verbose option, Adds more information on the execution
With --port option, The port used to import the data (default 61616)
With --password option, User name used to import the data. (default
null)
With --transaction option, If this is set to true you will need a
whole transaction to commit at the end. (default false)
With --input option, The input file name (default=exp.dmp)
With --user option, User name used to import the data. (default
null)
decode
Decode a journal's internal format into a new journal set of files
With --prefix option, The journal prefix (default activemq-datal)
With --directory option, The journal folder (default
../data/journal)
With --broker option, This would override the broker configuration
from the bootstrap
With --suffix option, The journal suffix (default amq)
With --verbose option, Adds more information on the execution
With --paging option, The folder used for paging (default from
broker.xml)
With --prefix option, The journal prefix (default activemq-data)
With --file-size option, The journal size (default 10485760)
With --directory option, The journal folder (default journal folder
from broker.xml)
With --input option, The input file name (default=exp.dmp)
With --journal option, The folder used for messages journal (default
from broker.xml)
With --large-messages option, The folder used for large-messages
(default from broker.xml)
With --bindings option, The folder used for bindings (default from
broker.xml)
encode
Encode a set of journal files into an internal encoded data format
With --prefix option, The journal prefix (default activemq-datal)
With --directory option, The journal folder (default the journal
folder from broker.xml)
With --directory option, The journal folder (default
../data/journal)
With --broker option, This would override the broker configuration
from the bootstrap
With --suffix option, The journal suffix (default amq)
With --verbose option, Adds more information on the execution
With --paging option, The folder used for paging (default from
broker.xml)
With --prefix option, The journal prefix (default activemq-data)
With --file-size option, The journal size (default 10485760)
With --journal option, The folder used for messages journal (default
from broker.xml)
With --large-messages option, The folder used for large-messages
(default from broker.xml)
With --bindings option, The folder used for bindings (default from
broker.xml)
compact
Compacts the journal of a non running server
With --broker option, This would override the broker configuration
from the bootstrap
With --verbose option, Adds more information on the execution
With --paging option, The folder used for paging (default from
broker.xml)
With --journal option, The folder used for messages journal (default
from broker.xml)
With --large-messages option, The folder used for large-messages
(default from broker.xml)
With --bindings option, The folder used for bindings (default from
broker.xml)
```

View File

@ -54,7 +54,7 @@ public class ConsumerStuckTest extends ActiveMQTestBase {
@Test
public void testClientStuckTest() throws Exception {
ServerLocator locator = createNettyNonHALocator().setConnectionTTL(1000).setClientFailureCheckPeriod(100).setConsumerWindowSize(10 * 1024 * 1024);
ServerLocator locator = createNettyNonHALocator().setConnectionTTL(1000).setClientFailureCheckPeriod(100).setConsumerWindowSize(10 * 1024 * 1024).setCallTimeout(1000);
ClientSessionFactory sf = locator.createSessionFactory();
((ClientSessionFactoryImpl) sf).stopPingingAfterOne();
@ -146,7 +146,7 @@ public class ConsumerStuckTest extends ActiveMQTestBase {
@Test
public void testClientStuckTestWithDirectDelivery() throws Exception {
ServerLocator locator = createNettyNonHALocator().setConnectionTTL(1000).setClientFailureCheckPeriod(100).setConsumerWindowSize(10 * 1024 * 1024);
ServerLocator locator = createNettyNonHALocator().setConnectionTTL(1000).setClientFailureCheckPeriod(100).setConsumerWindowSize(10 * 1024 * 1024).setCallTimeout(1000);
ClientSessionFactory sf = locator.createSessionFactory();
((ClientSessionFactoryImpl) sf).stopPingingAfterOne();

View File

@ -427,7 +427,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(server.getConfiguration().getBindingsLocation(), null, 1);
JournalImpl messagesJournal = new JournalImpl(1024 * 1024, 2, 0, 0, messagesFF, "activemq-bindings", "bindings", 1);
JournalImpl messagesJournal = new JournalImpl(1024 * 1024, 2, 2, 0, 0, messagesFF, "activemq-bindings", "bindings", 1);
messagesJournal.start();

View File

@ -191,7 +191,7 @@ public class JournalCrashTest extends ActiveMQTestBase {
*/
private void printJournal() throws Exception {
NIOSequentialFileFactory factory = new NIOSequentialFileFactory(new File(getJournalDir()), 100);
JournalImpl journal = new JournalImpl(ActiveMQDefaultConfiguration.getDefaultJournalFileSize(), 2, 0, 0, factory, "activemq-data", "amq", 100);
JournalImpl journal = new JournalImpl(ActiveMQDefaultConfiguration.getDefaultJournalFileSize(), 2, 2, 0, 0, factory, "activemq-data", "amq", 100);
ArrayList<RecordInfo> records = new ArrayList<>();
ArrayList<PreparedTransactionInfo> transactions = new ArrayList<>();

View File

@ -1496,7 +1496,7 @@ public class PagingTest extends ActiveMQTestBase {
List<PreparedTransactionInfo> list = new ArrayList<>();
JournalImpl jrn = new JournalImpl(config.getJournalFileSize(), 2, 0, 0, new NIOSequentialFileFactory(server.getConfiguration().getJournalLocation(), 1), "activemq-data", "amq", 1);
JournalImpl jrn = new JournalImpl(config.getJournalFileSize(), 2, 2, 0, 0, new NIOSequentialFileFactory(server.getConfiguration().getJournalLocation(), 1), "activemq-data", "amq", 1);
jrn.start();
jrn.load(records, list, null);

View File

@ -266,7 +266,7 @@ public class RedeliveryConsumerTest extends ActiveMQTestBase {
server.stop();
JournalImpl journal = new JournalImpl(server.getConfiguration().getJournalFileSize(), 2, 0, 0, new NIOSequentialFileFactory(server.getConfiguration().getJournalLocation(), 1), "activemq-data", "amq", 1);
JournalImpl journal = new JournalImpl(server.getConfiguration().getJournalFileSize(), 2, 2, 0, 0, new NIOSequentialFileFactory(server.getConfiguration().getJournalLocation(), 1), "activemq-data", "amq", 1);
final AtomicInteger updates = new AtomicInteger();

View File

@ -1756,7 +1756,7 @@ public class BridgeTest extends ActiveMQTestBase {
protected Map<Long, AtomicInteger> loadQueues(ActiveMQServer serverToInvestigate) throws Exception {
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(serverToInvestigate.getConfiguration().getJournalLocation(), 1);
JournalImpl messagesJournal = new JournalImpl(serverToInvestigate.getConfiguration().getJournalFileSize(), serverToInvestigate.getConfiguration().getJournalMinFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
JournalImpl messagesJournal = new JournalImpl(serverToInvestigate.getConfiguration().getJournalFileSize(), serverToInvestigate.getConfiguration().getJournalMinFiles(), serverToInvestigate.getConfiguration().getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
List<RecordInfo> records = new LinkedList<>();
List<PreparedTransactionInfo> preparedTransactions = new LinkedList<>();

View File

@ -207,7 +207,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
final byte recordType = (byte) 0;
journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO);
journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO);
journal.start();
@ -486,7 +486,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
final CountDownLatch latchDone = new CountDownLatch(1);
final CountDownLatch latchWait = new CountDownLatch(1);
journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO) {
journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO) {
@Override
protected SequentialFile createControlFile(final List<JournalFile> files,

View File

@ -318,7 +318,7 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
}
public static JournalImpl createJournal(final String journalType, final String journalDir) {
JournalImpl journal = new JournalImpl(10485760, 2, 0, 0, ValidateTransactionHealthTest.getFactory(journalType, journalDir), "journaltst", "tst", 500);
JournalImpl journal = new JournalImpl(10485760, 2, 2, 0, 0, ValidateTransactionHealthTest.getFactory(journalType, journalDir), "journaltst", "tst", 500);
return journal;
}

View File

@ -180,7 +180,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
@Test
public void testSpeedTransactional() throws Exception {
Journal journal = new JournalImpl(10 * 1024 * 1024, 10, 0, 0, getFileFactory(), "activemq-data", "amq", 5000);
Journal journal = new JournalImpl(10 * 1024 * 1024, 10, 10, 0, 0, getFileFactory(), "activemq-data", "amq", 5000);
journal.start();
@ -236,7 +236,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
JournalImplTestUnit.log.debug("num Files=" + numFiles);
Journal journal = new JournalImpl(10 * 1024 * 1024, numFiles, 0, 0, getFileFactory(), "activemq-data", "amq", 5000);
Journal journal = new JournalImpl(10 * 1024 * 1024, numFiles, numFiles, 0, 0, getFileFactory(), "activemq-data", "amq", 5000);
journal.start();
@ -259,7 +259,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
journal.stop();
journal = new JournalImpl(10 * 1024 * 1024, numFiles, 0, 0, getFileFactory(), "activemq-data", "amq", 5000);
journal = new JournalImpl(10 * 1024 * 1024, numFiles, numFiles, 0, 0, getFileFactory(), "activemq-data", "amq", 5000);
journal.start();
journal.load(new ArrayList<RecordInfo>(), null, null);

View File

@ -0,0 +1,440 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.performance.storage;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class PersistMultiThreadTest extends ActiveMQTestBase {
final String DIRECTORY = "./target/journaltmp";
FakePagingStore fakePagingStore = new FakePagingStore();
@Test
public void testMultipleWrites() throws Exception {
deleteDirectory(new File(DIRECTORY));
ActiveMQServer server = createServer(true);
server.getConfiguration().setJournalCompactMinFiles(ActiveMQDefaultConfiguration.getDefaultJournalCompactMinFiles());
server.getConfiguration().setJournalCompactPercentage(ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage());
server.getConfiguration().setJournalDirectory(DIRECTORY + "/journal");
server.getConfiguration().setBindingsDirectory(DIRECTORY + "/bindings");
server.getConfiguration().setPagingDirectory(DIRECTORY + "/paging");
server.getConfiguration().setLargeMessagesDirectory(DIRECTORY + "/largemessage");
server.getConfiguration().setJournalFileSize(10 * 1024 * 1024);
server.getConfiguration().setJournalMinFiles(2);
server.getConfiguration().setJournalType(JournalType.ASYNCIO);
server.start();
StorageManager storage = server.getStorageManager();
long msgID = storage.generateID();
System.out.println("msgID=" + msgID);
int NUMBER_OF_THREADS = 50;
int NUMBER_OF_MESSAGES = 5000;
MyThread[] threads = new MyThread[NUMBER_OF_THREADS];
final CountDownLatch alignFlag = new CountDownLatch(NUMBER_OF_THREADS);
final CountDownLatch startFlag = new CountDownLatch(1);
final CountDownLatch finishFlag = new CountDownLatch(NUMBER_OF_THREADS);
MyDeleteThread deleteThread = new MyDeleteThread("deleteThread", storage, NUMBER_OF_MESSAGES * NUMBER_OF_THREADS * 10);
deleteThread.start();
for (int i = 0; i < threads.length; i++) {
threads[i] = new MyThread("writer::" + i, storage, NUMBER_OF_MESSAGES, alignFlag, startFlag, finishFlag);
}
for (MyThread t : threads) {
t.start();
}
alignFlag.await();
long startTime = System.currentTimeMillis();
startFlag.countDown();
// I'm using a countDown to avoid measuring time spent on thread context from join.
// i.e. i want to measure as soon as the loops are done
finishFlag.await();
long endtime = System.currentTimeMillis();
System.out.println("Time:: " + (endtime - startTime));
for (MyThread t : threads) {
t.join();
Assert.assertEquals(0, t.errors.get());
}
deleteThread.join();
Assert.assertEquals(0, deleteThread.errors.get());
}
LinkedBlockingDeque<Long> deletes = new LinkedBlockingDeque<>();
class MyThread extends Thread {
final StorageManager storage;
final int numberOfMessages;
final AtomicInteger errors = new AtomicInteger(0);
final CountDownLatch align;
final CountDownLatch start;
final CountDownLatch finish;
MyThread(String name,
StorageManager storage,
int numberOfMessages,
CountDownLatch align,
CountDownLatch start,
CountDownLatch finish) {
super(name);
this.storage = storage;
this.numberOfMessages = numberOfMessages;
this.align = align;
this.start = start;
this.finish = finish;
}
public void run() {
try {
align.countDown();
start.await();
long id = storage.generateID();
long txID = storage.generateID();
// each thread will store a single message that will never be deleted, trying to force compacting to happen
storeMessage(txID, id);
storage.commit(txID);
OperationContext ctx = storage.getContext();
for (int i = 0; i < numberOfMessages; i++) {
txID = storage.generateID();
long[] messageID = new long[10];
for (int msgI = 0; msgI < 10; msgI++) {
id = storage.generateID();
messageID[msgI] = id;
storeMessage(txID, id);
}
storage.commit(txID);
ctx.waitCompletion();
for (long deleteID : messageID) {
deletes.add(deleteID);
}
}
}
catch (Exception e) {
e.printStackTrace();
errors.incrementAndGet();
}
finally {
finish.countDown();
}
}
private void storeMessage(long txID, long id) throws Exception {
ServerMessage message = new ServerMessageImpl(id, 10 * 1024);
message.setPagingStore(fakePagingStore);
message.getBodyBuffer().writeBytes(new byte[104]);
message.putStringProperty("hello", "" + id);
storage.storeMessageTransactional(txID, message);
storage.storeReferenceTransactional(txID, 1, id);
message.decrementRefCount();
}
}
class MyDeleteThread extends Thread {
final StorageManager storage;
final int numberOfMessages;
final AtomicInteger errors = new AtomicInteger(0);
MyDeleteThread(String name, StorageManager storage, int numberOfMessages) {
super(name);
this.storage = storage;
this.numberOfMessages = numberOfMessages;
}
public void run() {
long deletesNr = 0;
try {
for (int i = 0; i < numberOfMessages; i++) {
if (i % 1000 == 0) {
// storage.getContext().waitCompletion();
// deletesNr = 0;
// Thread.sleep(200);
}
deletesNr++;
Long deleteID = deletes.poll(10, TimeUnit.MINUTES);
if (deleteID == null) {
System.err.println("Coudn't poll delete info");
errors.incrementAndGet();
break;
}
storage.storeAcknowledge(1, deleteID);
storage.deleteMessage(deleteID);
}
}
catch (Exception e) {
e.printStackTrace(System.out);
errors.incrementAndGet();
}
finally {
System.err.println("Finished the delete loop!!!! deleted " + deletesNr);
}
}
}
class FakePagingStore implements PagingStore {
@Override
public SimpleString getAddress() {
return null;
}
@Override
public int getNumberOfPages() {
return 0;
}
@Override
public int getCurrentWritingPage() {
return 0;
}
@Override
public SimpleString getStoreName() {
return null;
}
@Override
public File getFolder() {
return null;
}
@Override
public AddressFullMessagePolicy getAddressFullMessagePolicy() {
return null;
}
@Override
public long getFirstPage() {
return 0;
}
@Override
public long getPageSizeBytes() {
return 0;
}
@Override
public long getAddressSize() {
return 0;
}
@Override
public long getMaxSize() {
return 0;
}
@Override
public void applySetting(AddressSettings addressSettings) {
}
@Override
public boolean isPaging() {
return false;
}
@Override
public void sync() throws Exception {
}
@Override
public void ioSync() throws Exception {
}
@Override
public boolean page(ServerMessage message,
Transaction tx,
RouteContextList listCtx,
ReentrantReadWriteLock.ReadLock readLock) throws Exception {
return false;
}
@Override
public Page createPage(int page) throws Exception {
return null;
}
@Override
public boolean checkPageFileExists(int page) throws Exception {
return false;
}
@Override
public PagingManager getPagingManager() {
return null;
}
@Override
public PageCursorProvider getCursorProvider() {
return null;
}
@Override
public void processReload() throws Exception {
}
@Override
public Page depage() throws Exception {
return null;
}
@Override
public void forceAnotherPage() throws Exception {
}
@Override
public Page getCurrentPage() {
return null;
}
@Override
public boolean startPaging() throws Exception {
return false;
}
@Override
public void stopPaging() throws Exception {
}
@Override
public void addSize(int size) {
}
@Override
public boolean checkMemory(Runnable runnable) {
return false;
}
@Override
public boolean lock(long timeout) {
return false;
}
@Override
public void unlock() {
}
@Override
public void flushExecutors() {
}
@Override
public Collection<Integer> getCurrentIds() throws Exception {
return null;
}
@Override
public void sendPages(ReplicationManager replicator, Collection<Integer> pageIds) throws Exception {
}
@Override
public void disableCleanup() {
}
@Override
public void enableCleanup() {
}
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
}
@Override
public boolean isStarted() {
return false;
}
}
}

View File

@ -0,0 +1,297 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.performance.storage;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
public class SendReceiveMultiThreadTest extends ActiveMQTestBase {
final String DIRECTORY = "./target/journaltmp";
ConnectionFactory cf;
Destination destination;
AtomicInteger received = new AtomicInteger(0);
AtomicInteger sent = new AtomicInteger(0);
int NUMBER_OF_THREADS = 400;
int NUMBER_OF_MESSAGES = 5000;
CountDownLatch receivedLatch = new CountDownLatch(NUMBER_OF_MESSAGES * NUMBER_OF_THREADS);
@Test
public void testMultipleWrites() throws Exception {
deleteDirectory(new File(DIRECTORY));
ActiveMQServer server = createServer(true);
server.getConfiguration().setJournalFileSize(10 * 1024 * 1024);
server.getConfiguration().setJournalMinFiles(2);
server.getConfiguration().setJournalCompactMinFiles(ActiveMQDefaultConfiguration.getDefaultJournalCompactMinFiles());
server.getConfiguration().setJournalCompactPercentage(ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage());
server.getConfiguration().setJournalType(JournalType.ASYNCIO);
server.getConfiguration().addAcceptorConfiguration("core", DefaultConnectionProperties.DEFAULT_BROKER_BIND_URL);
server.getConfiguration().setJournalDirectory(DIRECTORY + "/journal");
server.getConfiguration().setBindingsDirectory(DIRECTORY + "/bindings");
server.getConfiguration().setPagingDirectory(DIRECTORY + "/paging");
server.getConfiguration().setLargeMessagesDirectory(DIRECTORY + "/largemessage");
server.getConfiguration().setJournalMaxIO_AIO(200);
// TODO Setup Acceptors
server.start();
Queue queue = server.createQueue(SimpleString.toSimpleString("jms.queue.performanceQueue"), SimpleString.toSimpleString("jms.queue.performanceQueue"), null, true, false);
Queue queue2 = server.createQueue(SimpleString.toSimpleString("jms.queue.stationaryQueue"), SimpleString.toSimpleString("jms.queue.stationaryQueue"), null, true, false);
MyThread[] threads = new MyThread[NUMBER_OF_THREADS];
ConsumerThread[] cthreads = new ConsumerThread[NUMBER_OF_THREADS];
final CountDownLatch alignFlag = new CountDownLatch(NUMBER_OF_THREADS);
final CountDownLatch startFlag = new CountDownLatch(1);
final CountDownLatch finishFlag = new CountDownLatch(NUMBER_OF_THREADS);
cf = new ActiveMQConnectionFactory();
Thread slowSending = new Thread() {
public void run() {
Connection conn = null;
try {
conn = cf.createConnection();
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(ActiveMQJMSClient.createQueue("stationaryQueue"));
conn.start();
MessageConsumer consumer = session.createConsumer(ActiveMQJMSClient.createQueue("stationaryQueue"));
while (true) {
for (int i = 0; i < 10; i++) {
System.out.println("stationed message");
producer.send(session.createTextMessage("stationed"));
session.commit();
Thread.sleep(1000);
}
for (int i = 0; i < 10; i++) {
consumer.receive(5000);
session.commit();
System.out.println("Receiving stationed");
Thread.sleep(1000);
}
}
}
catch (Exception e) {
e.printStackTrace();
}
finally {
try {
conn.close();
}
catch (Exception ignored) {
}
}
}
};
slowSending.start();
destination = ActiveMQJMSClient.createQueue("performanceQueue");
for (int i = 0; i < threads.length; i++) {
threads[i] = new MyThread("sender::" + i, NUMBER_OF_MESSAGES, alignFlag, startFlag, finishFlag);
cthreads[i] = new ConsumerThread(NUMBER_OF_MESSAGES);
}
for (ConsumerThread t : cthreads) {
t.start();
}
for (MyThread t : threads) {
t.start();
}
Assert.assertEquals(NUMBER_OF_THREADS, queue.getConsumerCount());
alignFlag.await();
long startTime = System.currentTimeMillis();
startFlag.countDown();
// I'm using a countDown to avoid measuring time spent on thread context from join.
// i.e. i want to measure as soon as the loops are done
finishFlag.await();
long endtime = System.currentTimeMillis();
receivedLatch.await();
long endTimeConsuming = System.currentTimeMillis();
for (ConsumerThread t : cthreads) {
t.join();
Assert.assertEquals(0, t.errors);
}
for (MyThread t : threads) {
t.join();
Assert.assertEquals(0, t.errors.get());
}
slowSending.interrupt();
slowSending.join();
server.stop();
System.out.println("Time on sending:: " + (endtime - startTime));
System.out.println("Time on consuming:: " + (endTimeConsuming - startTime));
}
class ConsumerThread extends Thread {
final int numberOfMessages;
Connection connection;
Session session;
MessageConsumer consumer;
ConsumerThread(int numberOfMessages) throws Exception {
super("consumerthread");
this.numberOfMessages = numberOfMessages;
connection = cf.createConnection();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
consumer = session.createConsumer(destination);
connection.start();
}
int errors = 0;
public void run() {
try {
for (int i = 0; i < numberOfMessages; i++) {
Message message = consumer.receive(50000);
if (message == null) {
System.err.println("Could not receive message at i = " + numberOfMessages);
errors++;
break;
}
int r = received.incrementAndGet();
if (r % 1000 == 0) {
System.out.println("Received " + r + " messages");
}
if (i % 50 == 0) {
session.commit();
}
receivedLatch.countDown();
}
session.commit();
connection.close();
}
catch (Exception e) {
e.printStackTrace();
errors++;
}
}
}
class MyThread extends Thread {
final int numberOfMessages;
final AtomicInteger errors = new AtomicInteger(0);
final CountDownLatch align;
final CountDownLatch start;
final CountDownLatch finish;
MyThread(String name, int numberOfMessages, CountDownLatch align, CountDownLatch start, CountDownLatch finish) {
super(name);
this.numberOfMessages = numberOfMessages;
this.align = align;
this.start = start;
this.finish = finish;
}
public void run() {
try {
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(destination);
align.countDown();
start.await();
for (int i = 0; i < numberOfMessages; i++) {
BytesMessage msg = session.createBytesMessage();
msg.writeBytes(new byte[1024]);
producer.send(msg);
session.commit();
int s = sent.incrementAndGet();
if (s % 1000 == 0) {
System.out.println("Sent " + s);
}
}
connection.close();
System.out.println("Send " + numberOfMessages + " messages on thread " + Thread.currentThread().getName());
}
catch (Exception e) {
e.printStackTrace();
errors.incrementAndGet();
}
finally {
finish.countDown();
}
}
}
}

View File

@ -75,7 +75,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase {
public void testInsertAndLoad() throws Exception {
SequentialFileFactory factory = new AIOSequentialFileFactory(getTestDirfile(), 1000);
JournalImpl impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
JournalImpl impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
impl.start();
@ -91,7 +91,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase {
impl.stop();
factory = new AIOSequentialFileFactory(getTestDirfile(), 1000);
impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
impl.start();
@ -108,7 +108,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase {
impl.stop();
factory = new AIOSequentialFileFactory(getTestDirfile(), 1000);
impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
impl.start();
@ -136,7 +136,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase {
public void testInsertUpdateAndLoad() throws Exception {
SequentialFileFactory factory = new AIOSequentialFileFactory(getTestDirfile(), 1000);
JournalImpl impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
JournalImpl impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
impl.start();
@ -153,7 +153,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase {
impl.stop();
factory = new AIOSequentialFileFactory(getTestDirfile(), 1000);
impl = new JournalImpl(10 * 1024 * 1024, 10, 0, 0, factory, "amq", "amq", 1000);
impl = new JournalImpl(10 * 1024 * 1024, 10, 10, 0, 0, factory, "amq", "amq", 1000);
impl.start();
@ -170,7 +170,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase {
impl.stop();
factory = new AIOSequentialFileFactory(getTestDirfile(), 1000);
impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
impl.start();

View File

@ -115,7 +115,7 @@ public class JournalCleanupCompactStressTest extends ActiveMQTestBase {
maxAIO = ActiveMQDefaultConfiguration.getDefaultJournalMaxIoNio();
}
journal = new JournalImpl(50 * 1024, 20, 50, ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage(), factory, "activemq-data", "amq", maxAIO) {
journal = new JournalImpl(50 * 1024, 20, 20, 50, ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage(), factory, "activemq-data", "amq", maxAIO) {
@Override
protected void onCompactLockingTheJournal() throws Exception {
}

View File

@ -98,7 +98,7 @@ public abstract class MixupCompactorTestBase extends JournalImplTestBase {
@Override
public void createJournal() throws Exception {
journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO) {
journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO) {
@Override
public void onCompactDone() {

View File

@ -84,7 +84,7 @@ public class NIOMultiThreadCompactorStressTest extends ActiveMQTestBase {
stopServer();
NIOSequentialFileFactory factory = new NIOSequentialFileFactory(new File(getJournalDir()), 1);
JournalImpl journal = new JournalImpl(ActiveMQDefaultConfiguration.getDefaultJournalFileSize(), 2, 0, 0, factory, "activemq-data", "amq", 100);
JournalImpl journal = new JournalImpl(ActiveMQDefaultConfiguration.getDefaultJournalFileSize(), 2, 2, 0, 0, factory, "activemq-data", "amq", 100);
List<RecordInfo> committedRecords = new ArrayList<>();
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>();

View File

@ -141,7 +141,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
factory = new FakeSequentialFileFactory(512, true);
try {
journalImpl = new JournalImpl(2000, 2, 0, 0, factory, "tt", "tt", 1000);
journalImpl = new JournalImpl(2000, 2, 2, 0, 0, factory, "tt", "tt", 1000);
Assert.fail("Expected IllegalArgumentException");
}
catch (IllegalArgumentException ignored) {
@ -1201,7 +1201,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
public void testAlignmentOverReload() throws Exception {
factory = new FakeSequentialFileFactory(512, false);
journalImpl = new JournalImpl(512 + 512 * 3, 20, 0, 0, factory, "amq", "amq", 1000);
journalImpl = new JournalImpl(512 + 512 * 3, 20, 20, 0, 0, factory, "amq", "amq", 1000);
journalImpl.start();
@ -1214,7 +1214,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
journalImpl.stop();
journalImpl = new JournalImpl(512 + 1024 + 512, 20, 0, 0, factory, "amq", "amq", 1000);
journalImpl = new JournalImpl(512 + 1024 + 512, 20, 20, 0, 0, factory, "amq", "amq", 1000);
addActiveMQComponent(journalImpl);
journalImpl.start();
journalImpl.load(AlignedJournalImplTest.dummyLoader);
@ -1230,7 +1230,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
journalImpl.stop();
journalImpl = new JournalImpl(512 + 1024 + 512, 20, 0, 0, factory, "amq", "amq", 1000);
journalImpl = new JournalImpl(512 + 1024 + 512, 20, 20, 0, 0, factory, "amq", "amq", 1000);
addActiveMQComponent(journalImpl);
journalImpl.start();
@ -1301,7 +1301,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
journalImpl.stop();
}
journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, 0, 0, factory, "tt", "tt", 1000);
journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, numberOfMinimalFiles, 0, 0, factory, "tt", "tt", 1000);
addActiveMQComponent(journalImpl);
journalImpl.start();

View File

@ -209,7 +209,7 @@ public class JournalAsyncTest extends ActiveMQTestBase {
journalImpl.stop();
}
journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, 0, 0, factory, "tt", "tt", 1000);
journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, numberOfMinimalFiles, 0, 0, factory, "tt", "tt", 1000);
journalImpl.start();

View File

@ -54,6 +54,8 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
protected int minFiles;
protected int poolSize;
protected int fileSize;
protected boolean sync;
@ -122,7 +124,16 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
// ---------------------------------------------------------------------------------
protected void setup(final int minFreeFiles, final int fileSize, final boolean sync, final int maxAIO) {
this.minFiles = minFreeFiles;
this.poolSize = minFreeFiles;
this.fileSize = fileSize;
this.sync = sync;
this.maxAIO = maxAIO;
}
protected void setup(final int minFreeFiles, final int poolSize, final int fileSize, final boolean sync, final int maxAIO) {
minFiles = minFreeFiles;
this.poolSize = poolSize;
this.fileSize = fileSize;
this.sync = sync;
this.maxAIO = maxAIO;
@ -130,13 +141,14 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
protected void setup(final int minFreeFiles, final int fileSize, final boolean sync) {
minFiles = minFreeFiles;
poolSize = minFreeFiles;
this.fileSize = fileSize;
this.sync = sync;
maxAIO = 50;
}
public void createJournal() throws Exception {
journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO) {
journal = new JournalImpl(fileSize, minFiles, poolSize, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO) {
@Override
public void onCompactDone() {
latchDone.countDown();

View File

@ -121,7 +121,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
@Test
public void testParams() throws Exception {
try {
new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, 0, 0, fileFactory, filePrefix, fileExtension, 1);
new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, 10, 0, 0, fileFactory, filePrefix, fileExtension, 1);
Assert.fail("Should throw exception");
}
@ -130,7 +130,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
}
try {
new JournalImpl(10 * 1024, 1, 0, 0, fileFactory, filePrefix, fileExtension, 1);
new JournalImpl(10 * 1024, 1, 0, 0, 0, fileFactory, filePrefix, fileExtension, 1);
Assert.fail("Should throw exception");
}
@ -139,7 +139,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
}
try {
new JournalImpl(10 * 1024, 10, 0, 0, null, filePrefix, fileExtension, 1);
new JournalImpl(10 * 1024, 10, 0, 0, 0, null, filePrefix, fileExtension, 1);
Assert.fail("Should throw exception");
}
@ -148,7 +148,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
}
try {
new JournalImpl(10 * 1024, 10, 0, 0, fileFactory, null, fileExtension, 1);
new JournalImpl(10 * 1024, 10, 0, 0, 0, fileFactory, null, fileExtension, 1);
Assert.fail("Should throw exception");
}
@ -157,7 +157,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
}
try {
new JournalImpl(10 * 1024, 10, 0, 0, fileFactory, filePrefix, null, 1);
new JournalImpl(10 * 1024, 10, 0, 0, 0, fileFactory, filePrefix, null, 1);
Assert.fail("Should throw exception");
}
@ -166,7 +166,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
}
try {
new JournalImpl(10 * 1024, 10, 0, 0, fileFactory, filePrefix, null, 0);
new JournalImpl(10 * 1024, 10, 0, 0, 0, fileFactory, filePrefix, null, 0);
Assert.fail("Should throw exception");
}
@ -567,6 +567,103 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
stopJournal();
}
@Test
public void testOrganicallyGrowNoLimit() throws Exception {
setup(2, -1, 10 * 1024, true, 50);
createJournal();
journal.setAutoReclaim(true);
startJournal();
load();
List<String> files1 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(2, files1.size());
Assert.assertEquals(0, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
// Fill all the files
for (int i = 0; i < 200; i++) {
add(i);
journal.forceMoveNextFile();
}
for (int i = 0; i < 200; i++) {
delete(i);
}
journal.forceMoveNextFile();
journal.checkReclaimStatus();
files1 = fileFactory.listFiles(fileExtension);
Assert.assertTrue(files1.size() > 200);
int numberOfFiles = files1.size();
for (int i = 300; i < 350; i++) {
add(i);
journal.forceMoveNextFile();
}
journal.checkReclaimStatus();
files1 = fileFactory.listFiles(fileExtension);
Assert.assertTrue(files1.size() > 200);
Assert.assertEquals(numberOfFiles, files1.size());
System.out.println("we have " + files1.size() + " files now");
stopJournal();
}
@Test
public void testOrganicallyWithALimit() throws Exception {
setup(2, 5, 10 * 1024, true, 50);
createJournal();
journal.setAutoReclaim(true);
startJournal();
load();
List<String> files1 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(2, files1.size());
Assert.assertEquals(0, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
// Fill all the files
for (int i = 0; i < 200; i++) {
add(i);
journal.forceMoveNextFile();
}
journal.checkReclaimStatus();
for (int i = 0; i < 200; i++) {
delete(i);
}
journal.forceMoveNextFile();
journal.checkReclaimStatus();
files1 = fileFactory.listFiles(fileExtension);
Assert.assertTrue("supposed to have less than 10 but it had " + files1.size() + " files created", files1.size() < 10);
stopJournal();
}
// Validate the methods that are used on assertions
@Test
public void testCalculations() throws Exception {

View File

@ -39,7 +39,7 @@ public class BatchIDGeneratorUnitTest extends ActiveMQTestBase {
@Test
public void testSequence() throws Exception {
NIOSequentialFileFactory factory = new NIOSequentialFileFactory(new File(getTestDir()), 1);
Journal journal = new JournalImpl(10 * 1024, 2, 0, 0, factory, "activemq-bindings", "bindings", 1);
Journal journal = new JournalImpl(10 * 1024, 2, 2, 0, 0, factory, "activemq-bindings", "bindings", 1);
journal.start();