This closes #1348
This commit is contained in:
commit
5b0a133863
|
@ -258,6 +258,9 @@ public final class ActiveMQDefaultConfiguration {
|
|||
// The percentage of live data on which we consider compacting the journal
|
||||
private static int DEFAULT_JOURNAL_COMPACT_PERCENTAGE = 30;
|
||||
|
||||
// The time to wait when opening a new journal file before failing
|
||||
private static int DEFAULT_JOURNAL_FILE_OPEN_TIMEOUT = 5;
|
||||
|
||||
// The minimal number of data files before we can start compacting
|
||||
private static int DEFAULT_JOURNAL_COMPACT_MIN_FILES = 10;
|
||||
|
||||
|
@ -782,6 +785,10 @@ public final class ActiveMQDefaultConfiguration {
|
|||
return DEFAULT_JOURNAL_COMPACT_PERCENTAGE;
|
||||
}
|
||||
|
||||
public static int getDefaultJournalFileOpenTimeout() {
|
||||
return DEFAULT_JOURNAL_FILE_OPEN_TIMEOUT;
|
||||
}
|
||||
|
||||
/**
|
||||
* The minimal number of data files before we can start compacting
|
||||
*/
|
||||
|
|
|
@ -81,6 +81,8 @@ public class JournalFilesRepository {
|
|||
|
||||
private final AtomicInteger freeFilesCount = new AtomicInteger(0);
|
||||
|
||||
private final int journalFileOpenTimeout;
|
||||
|
||||
private Executor openFilesExecutor;
|
||||
|
||||
private final Runnable pushOpenRunnable = new Runnable() {
|
||||
|
@ -102,7 +104,8 @@ public class JournalFilesRepository {
|
|||
final int maxAIO,
|
||||
final int fileSize,
|
||||
final int minFiles,
|
||||
final int poolSize) {
|
||||
final int poolSize,
|
||||
final int journalFileOpenTimeout) {
|
||||
if (filePrefix == null) {
|
||||
throw new IllegalArgumentException("filePrefix cannot be null");
|
||||
}
|
||||
|
@ -121,6 +124,7 @@ public class JournalFilesRepository {
|
|||
this.poolSize = poolSize;
|
||||
this.userVersion = userVersion;
|
||||
this.journal = journal;
|
||||
this.journalFileOpenTimeout = journalFileOpenTimeout;
|
||||
}
|
||||
|
||||
// Public --------------------------------------------------------
|
||||
|
@ -418,7 +422,7 @@ public class JournalFilesRepository {
|
|||
openFilesExecutor.execute(pushOpenRunnable);
|
||||
}
|
||||
|
||||
JournalFile nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
|
||||
JournalFile nextFile = openedFiles.poll(journalFileOpenTimeout, TimeUnit.SECONDS);
|
||||
if (nextFile == null) {
|
||||
fileFactory.onIOError(ActiveMQJournalBundle.BUNDLE.fileNotOpened(), "unable to open ", null);
|
||||
// We need to reconnect the current file with the timed buffer as we were not able to roll the file forward
|
||||
|
|
|
@ -234,7 +234,21 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
final String fileExtension,
|
||||
final int maxAIO,
|
||||
final int userVersion) {
|
||||
this(null, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, userVersion);
|
||||
this(null, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, 5, fileFactory, filePrefix, fileExtension, maxAIO, userVersion);
|
||||
}
|
||||
|
||||
public JournalImpl(final int fileSize,
|
||||
final int minFiles,
|
||||
final int poolSize,
|
||||
final int compactMinFiles,
|
||||
final int compactPercentage,
|
||||
final int journalFileOpenTimeout,
|
||||
final SequentialFileFactory fileFactory,
|
||||
final String filePrefix,
|
||||
final String fileExtension,
|
||||
final int maxAIO,
|
||||
final int userVersion) {
|
||||
this(null, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, journalFileOpenTimeout, fileFactory, filePrefix, fileExtension, maxAIO, userVersion);
|
||||
}
|
||||
|
||||
public JournalImpl(final ExecutorFactory ioExecutors,
|
||||
|
@ -248,6 +262,21 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
final String fileExtension,
|
||||
final int maxAIO,
|
||||
final int userVersion) {
|
||||
this(ioExecutors, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, 5, fileFactory, filePrefix, fileExtension, maxAIO, userVersion);
|
||||
}
|
||||
|
||||
public JournalImpl(final ExecutorFactory ioExecutors,
|
||||
final int fileSize,
|
||||
final int minFiles,
|
||||
final int poolSize,
|
||||
final int compactMinFiles,
|
||||
final int compactPercentage,
|
||||
final int journalFileOpenTimeout,
|
||||
final SequentialFileFactory fileFactory,
|
||||
final String filePrefix,
|
||||
final String fileExtension,
|
||||
final int maxAIO,
|
||||
final int userVersion) {
|
||||
|
||||
super(fileFactory.isSupportsCallbacks(), fileSize);
|
||||
|
||||
|
@ -275,7 +304,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
|
||||
this.fileFactory = fileFactory;
|
||||
|
||||
filesRepository = new JournalFilesRepository(fileFactory, this, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles, poolSize);
|
||||
filesRepository = new JournalFilesRepository(fileFactory, this, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles, poolSize, journalFileOpenTimeout);
|
||||
|
||||
this.userVersion = userVersion;
|
||||
}
|
||||
|
|
|
@ -619,6 +619,16 @@ public interface Configuration {
|
|||
*/
|
||||
int getJournalCompactPercentage();
|
||||
|
||||
/**
|
||||
* @return How long to wait when opening a new Journal file before failing
|
||||
*/
|
||||
int getJournalFileOpenTimeout();
|
||||
|
||||
/**
|
||||
* Sets the journal file open timeout
|
||||
*/
|
||||
Configuration setJournalFileOpenTimeout(int journalFileOpenTimeout);
|
||||
|
||||
/**
|
||||
* Sets the percentage of live data before compacting the journal.
|
||||
*/
|
||||
|
|
|
@ -167,6 +167,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
|||
|
||||
protected int journalCompactPercentage = ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage();
|
||||
|
||||
protected int journalFileOpenTimeout = ActiveMQDefaultConfiguration.getDefaultJournalFileOpenTimeout();
|
||||
|
||||
protected int journalFileSize = ActiveMQDefaultConfiguration.getDefaultJournalFileSize();
|
||||
|
||||
protected int journalPoolFiles = ActiveMQDefaultConfiguration.getDefaultJournalPoolFiles();
|
||||
|
@ -1113,6 +1115,17 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getJournalFileOpenTimeout() {
|
||||
return journalFileOpenTimeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration setJournalFileOpenTimeout(int journalFileOpenTimeout) {
|
||||
this.journalFileOpenTimeout = journalFileOpenTimeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigurationImpl setJournalCompactPercentage(final int percentage) {
|
||||
journalCompactPercentage = percentage;
|
||||
|
|
|
@ -510,6 +510,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
config.setJournalMaxIO_NIO(journalMaxIO);
|
||||
}
|
||||
|
||||
config.setJournalFileOpenTimeout(getInteger(e, "journal-file-open-timeout", ActiveMQDefaultConfiguration.getDefaultJournalFileOpenTimeout(), Validators.GT_ZERO));
|
||||
|
||||
config.setJournalMinFiles(getInteger(e, "journal-min-files", config.getJournalMinFiles(), Validators.GT_ZERO));
|
||||
|
||||
config.setJournalPoolFiles(getInteger(e, "journal-pool-files", config.getJournalPoolFiles(), Validators.MINUS_ONE_OR_GT_ZERO));
|
||||
|
|
|
@ -121,7 +121,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
|||
bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
|
||||
bindingsFF.setDatasync(config.isJournalDatasync());
|
||||
|
||||
Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1, 0);
|
||||
Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), bindingsFF, "activemq-bindings", "bindings", 1, 0);
|
||||
|
||||
bindingsJournal = localBindings;
|
||||
originalBindingsJournal = localBindings;
|
||||
|
|
|
@ -688,6 +688,14 @@
|
|||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="journal-file-open-timeout" type="xsd:int" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
the length of time to wait when opening a new Journal file before timing out and failing
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="server-dump-interval" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
|
|
|
@ -93,6 +93,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
Assert.assertEquals(10000, conf.getJournalBufferSize_NIO());
|
||||
Assert.assertEquals(1000, conf.getJournalBufferTimeout_NIO());
|
||||
Assert.assertEquals(56546, conf.getJournalMaxIO_NIO());
|
||||
Assert.assertEquals(9876, conf.getJournalFileOpenTimeout());
|
||||
|
||||
Assert.assertEquals(false, conf.isJournalSyncTransactional());
|
||||
Assert.assertEquals(true, conf.isJournalSyncNonTransactional());
|
||||
|
|
|
@ -231,6 +231,7 @@
|
|||
<journal-compact-percentage>33</journal-compact-percentage>
|
||||
<journal-compact-min-files>123</journal-compact-min-files>
|
||||
<journal-max-io>56546</journal-max-io>
|
||||
<journal-file-open-timeout>9876</journal-file-open-timeout>
|
||||
<perf-blast-pages>5</perf-blast-pages>
|
||||
<run-sync-speed-test>true</run-sync-speed-test>
|
||||
<server-dump-interval>5000</server-dump-interval>
|
||||
|
|
Loading…
Reference in New Issue