mirror of
https://github.com/apache/activemq.git
synced 2025-02-16 23:16:52 +00:00
https://issues.apache.org/jira/browse/AMQ-5603 - reverting default preallocatonScope to entire_journal b/c async only really works for ssd
This commit is contained in:
parent
e3a68717f1
commit
65cef69130
@ -259,7 +259,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
|
int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
|
||||||
boolean enableIndexWriteAsync = false;
|
boolean enableIndexWriteAsync = false;
|
||||||
int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
|
int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
|
||||||
private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL_ASYNC.name();
|
private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
|
||||||
private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name();
|
private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name();
|
||||||
|
|
||||||
protected AtomicBoolean opened = new AtomicBoolean();
|
protected AtomicBoolean opened = new AtomicBoolean();
|
||||||
|
@ -194,7 +194,7 @@ public class Journal {
|
|||||||
private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null);
|
private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null);
|
||||||
private volatile DataFile nextDataFile;
|
private volatile DataFile nextDataFile;
|
||||||
|
|
||||||
protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL_ASYNC;
|
protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
|
||||||
protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
|
protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
|
||||||
private File osKernelCopyTemplateFile = null;
|
private File osKernelCopyTemplateFile = null;
|
||||||
|
|
||||||
@ -283,10 +283,6 @@ public class Journal {
|
|||||||
nextDataFileId = currentDataFile.get().dataFileId + 1;
|
nextDataFileId = currentDataFile.get().dataFileId + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (preallocationStrategy != PreallocationStrategy.SPARSE_FILE && maxFileLength != DEFAULT_MAX_FILE_LENGTH) {
|
|
||||||
LOG.warn("You are using a preallocation strategy and journal maxFileLength which should be benchmarked accordingly to not introduce unexpected latencies.");
|
|
||||||
}
|
|
||||||
|
|
||||||
if( lastAppendLocation.get()==null ) {
|
if( lastAppendLocation.get()==null ) {
|
||||||
DataFile df = dataFiles.getTail();
|
DataFile df = dataFiles.getTail();
|
||||||
lastAppendLocation.set(recoveryCheck(df));
|
lastAppendLocation.set(recoveryCheck(df));
|
||||||
|
@ -107,6 +107,7 @@ public class AMQ2832Test {
|
|||||||
// speed up the test case, checkpoint an cleanup early and often
|
// speed up the test case, checkpoint an cleanup early and often
|
||||||
adapter.setCheckpointInterval(5000);
|
adapter.setCheckpointInterval(5000);
|
||||||
adapter.setCleanupInterval(5000);
|
adapter.setCleanupInterval(5000);
|
||||||
|
adapter.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL.name());
|
||||||
|
|
||||||
if (recover) {
|
if (recover) {
|
||||||
adapter.setForceRecoverIndex(true);
|
adapter.setForceRecoverIndex(true);
|
||||||
@ -288,7 +289,7 @@ public class AMQ2832Test {
|
|||||||
assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
|
assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
|
||||||
@Override
|
@Override
|
||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
return getNumberOfJournalFiles() <= 4;
|
return getNumberOfJournalFiles() <= 3;
|
||||||
}
|
}
|
||||||
}, TimeUnit.MINUTES.toMillis(3)));
|
}, TimeUnit.MINUTES.toMillis(3)));
|
||||||
|
|
||||||
|
@ -22,6 +22,7 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
|
|||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||||
|
import org.apache.activemq.store.kahadb.disk.journal.Journal;
|
||||||
import org.apache.activemq.util.ConsumerThread;
|
import org.apache.activemq.util.ConsumerThread;
|
||||||
import org.apache.activemq.util.ProducerThread;
|
import org.apache.activemq.util.ProducerThread;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -78,6 +79,7 @@ public class AMQ3120Test {
|
|||||||
// speed up the test case, checkpoint an cleanup early and often
|
// speed up the test case, checkpoint an cleanup early and often
|
||||||
adapter.setCheckpointInterval(500);
|
adapter.setCheckpointInterval(500);
|
||||||
adapter.setCleanupInterval(500);
|
adapter.setCleanupInterval(500);
|
||||||
|
adapter.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL.name());
|
||||||
|
|
||||||
if (!deleteAllOnStart) {
|
if (!deleteAllOnStart) {
|
||||||
adapter.setForceRecoverIndex(true);
|
adapter.setForceRecoverIndex(true);
|
||||||
@ -114,7 +116,7 @@ public class AMQ3120Test {
|
|||||||
final int messageCount = 500;
|
final int messageCount = 500;
|
||||||
startBroker(true);
|
startBroker(true);
|
||||||
int fileCount = getFileCount(kahaDbDir);
|
int fileCount = getFileCount(kahaDbDir);
|
||||||
assertEquals(5, fileCount);
|
assertEquals(4, fileCount);
|
||||||
|
|
||||||
Connection connection = new ActiveMQConnectionFactory(
|
Connection connection = new ActiveMQConnectionFactory(
|
||||||
broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
|
broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
|
||||||
|
@ -27,6 +27,7 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
|
|||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||||
|
import org.apache.activemq.store.kahadb.disk.journal.Journal;
|
||||||
import org.apache.activemq.util.ConsumerThread;
|
import org.apache.activemq.util.ConsumerThread;
|
||||||
import org.apache.activemq.util.ProducerThread;
|
import org.apache.activemq.util.ProducerThread;
|
||||||
import org.apache.activemq.util.Wait;
|
import org.apache.activemq.util.Wait;
|
||||||
@ -81,6 +82,7 @@ public class AMQ4323Test {
|
|||||||
// speed up the test case, checkpoint an cleanup early and often
|
// speed up the test case, checkpoint an cleanup early and often
|
||||||
adapter.setCheckpointInterval(500);
|
adapter.setCheckpointInterval(500);
|
||||||
adapter.setCleanupInterval(500);
|
adapter.setCleanupInterval(500);
|
||||||
|
adapter.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL.name());
|
||||||
|
|
||||||
if (!deleteAllOnStart) {
|
if (!deleteAllOnStart) {
|
||||||
adapter.setForceRecoverIndex(true);
|
adapter.setForceRecoverIndex(true);
|
||||||
@ -116,7 +118,7 @@ public class AMQ4323Test {
|
|||||||
final int messageCount = 500;
|
final int messageCount = 500;
|
||||||
startBroker(true);
|
startBroker(true);
|
||||||
int fileCount = getFileCount(kahaDbDir);
|
int fileCount = getFileCount(kahaDbDir);
|
||||||
assertEquals(5, fileCount);
|
assertEquals(4, fileCount);
|
||||||
|
|
||||||
Connection connection = new ActiveMQConnectionFactory(
|
Connection connection = new ActiveMQConnectionFactory(
|
||||||
broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
|
broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
|
||||||
@ -149,7 +151,7 @@ public class AMQ4323Test {
|
|||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
int fileCount = getFileCount(kahaDbDir);
|
int fileCount = getFileCount(kahaDbDir);
|
||||||
LOG.info("current filecount:" + fileCount);
|
LOG.info("current filecount:" + fileCount);
|
||||||
return 5 == fileCount;
|
return 4 == fileCount;
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
@ -32,6 +32,7 @@ import javax.jms.Session;
|
|||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.store.kahadb.disk.journal.Journal;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
@ -94,6 +95,7 @@ public class KahaDBIndexLocationTest {
|
|||||||
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
|
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
|
||||||
persistenceAdapter.setDirectory(kahaDataDir);
|
persistenceAdapter.setDirectory(kahaDataDir);
|
||||||
persistenceAdapter.setIndexDirectory(kahaIndexDir);
|
persistenceAdapter.setIndexDirectory(kahaIndexDir);
|
||||||
|
persistenceAdapter.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL.name());
|
||||||
|
|
||||||
broker.setDataDirectoryFile(testDataDir);
|
broker.setDataDirectoryFile(testDataDir);
|
||||||
broker.setUseJmx(false);
|
broker.setUseJmx(false);
|
||||||
@ -135,7 +137,7 @@ public class KahaDBIndexLocationTest {
|
|||||||
|
|
||||||
// Should contain the initial log for the journal and the lock.
|
// Should contain the initial log for the journal and the lock.
|
||||||
assertNotNull(journal);
|
assertNotNull(journal);
|
||||||
assertEquals(3, journal.length);
|
assertEquals(2, journal.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -40,6 +40,7 @@ import org.apache.activemq.broker.BrokerService;
|
|||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
|
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
|
||||||
|
import org.apache.activemq.store.kahadb.disk.journal.Journal;
|
||||||
import org.apache.activemq.util.Wait;
|
import org.apache.activemq.util.Wait;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.io.filefilter.TrueFileFilter;
|
import org.apache.commons.io.filefilter.TrueFileFilter;
|
||||||
@ -75,6 +76,7 @@ public class SubscriptionRecoveryTest {
|
|||||||
File dataFile=new File("KahaDB");
|
File dataFile=new File("KahaDB");
|
||||||
pa.setDirectory(dataFile);
|
pa.setDirectory(dataFile);
|
||||||
pa.setJournalMaxFileLength(10*1024);
|
pa.setJournalMaxFileLength(10*1024);
|
||||||
|
pa.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL.name());
|
||||||
pa.setCheckpointInterval(TimeUnit.SECONDS.toMillis(5));
|
pa.setCheckpointInterval(TimeUnit.SECONDS.toMillis(5));
|
||||||
pa.setCleanupInterval(TimeUnit.SECONDS.toMillis(5));
|
pa.setCleanupInterval(TimeUnit.SECONDS.toMillis(5));
|
||||||
//Delete the index files on recovery
|
//Delete the index files on recovery
|
||||||
|
Loading…
x
Reference in New Issue
Block a user