This commit is contained in:
Clebert Suconic 2017-06-22 10:48:10 -04:00
commit 902e0c49ab
10 changed files with 84 additions and 9 deletions

View File

@ -261,6 +261,9 @@ public final class ActiveMQDefaultConfiguration {
// The percentage of live data on which we consider compacting the journal // The percentage of live data on which we consider compacting the journal
private static int DEFAULT_JOURNAL_COMPACT_PERCENTAGE = 30; private static int DEFAULT_JOURNAL_COMPACT_PERCENTAGE = 30;
// The time to wait when opening a new journal file before failing
private static int DEFAULT_JOURNAL_FILE_OPEN_TIMEOUT = 5;
// The minimal number of data files before we can start compacting // The minimal number of data files before we can start compacting
private static int DEFAULT_JOURNAL_COMPACT_MIN_FILES = 10; private static int DEFAULT_JOURNAL_COMPACT_MIN_FILES = 10;
@ -796,6 +799,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_JOURNAL_COMPACT_PERCENTAGE; return DEFAULT_JOURNAL_COMPACT_PERCENTAGE;
} }
public static int getDefaultJournalFileOpenTimeout() {
return DEFAULT_JOURNAL_FILE_OPEN_TIMEOUT;
}
/** /**
* The minimal number of data files before we can start compacting * The minimal number of data files before we can start compacting
*/ */

View File

@ -81,6 +81,8 @@ public class JournalFilesRepository {
private final AtomicInteger freeFilesCount = new AtomicInteger(0); private final AtomicInteger freeFilesCount = new AtomicInteger(0);
private final int journalFileOpenTimeout;
private Executor openFilesExecutor; private Executor openFilesExecutor;
private final Runnable pushOpenRunnable = new Runnable() { private final Runnable pushOpenRunnable = new Runnable() {
@ -103,7 +105,8 @@ public class JournalFilesRepository {
final int maxAIO, final int maxAIO,
final int fileSize, final int fileSize,
final int minFiles, final int minFiles,
final int poolSize) { final int poolSize,
final int journalFileOpenTimeout) {
if (filePrefix == null) { if (filePrefix == null) {
throw new IllegalArgumentException("filePrefix cannot be null"); throw new IllegalArgumentException("filePrefix cannot be null");
} }
@ -122,6 +125,7 @@ public class JournalFilesRepository {
this.poolSize = poolSize; this.poolSize = poolSize;
this.userVersion = userVersion; this.userVersion = userVersion;
this.journal = journal; this.journal = journal;
this.journalFileOpenTimeout = journalFileOpenTimeout;
} }
// Public -------------------------------------------------------- // Public --------------------------------------------------------
@ -421,7 +425,7 @@ public class JournalFilesRepository {
pushOpen(); pushOpen();
nextFile = openedFiles.poll(5, TimeUnit.SECONDS); nextFile = openedFiles.poll(journalFileOpenTimeout, TimeUnit.SECONDS);
} }
if (openedFiles.isEmpty()) { if (openedFiles.isEmpty()) {
@ -431,7 +435,7 @@ public class JournalFilesRepository {
if (nextFile == null) { if (nextFile == null) {
logger.debug("Could not get a file in 5 seconds, it will retry directly, without an executor"); logger.debug("Could not get a file in " + journalFileOpenTimeout + " seconds, it will retry directly, without an executor");
try { try {
nextFile = takeFile(true, true, true, false); nextFile = takeFile(true, true, true, false);
} catch (Exception e) { } catch (Exception e) {

View File

@ -234,7 +234,21 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final String fileExtension, final String fileExtension,
final int maxAIO, final int maxAIO,
final int userVersion) { final int userVersion) {
this(null, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, userVersion); this(null, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, 5, fileFactory, filePrefix, fileExtension, maxAIO, userVersion);
}
public JournalImpl(final int fileSize,
final int minFiles,
final int poolSize,
final int compactMinFiles,
final int compactPercentage,
final int journalFileOpenTimeout,
final SequentialFileFactory fileFactory,
final String filePrefix,
final String fileExtension,
final int maxAIO,
final int userVersion) {
this(null, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, journalFileOpenTimeout, fileFactory, filePrefix, fileExtension, maxAIO, userVersion);
} }
public JournalImpl(final ExecutorFactory ioExecutors, public JournalImpl(final ExecutorFactory ioExecutors,
@ -248,6 +262,21 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final String fileExtension, final String fileExtension,
final int maxAIO, final int maxAIO,
final int userVersion) { final int userVersion) {
this(ioExecutors, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, 5, fileFactory, filePrefix, fileExtension, maxAIO, userVersion);
}
public JournalImpl(final ExecutorFactory ioExecutors,
final int fileSize,
final int minFiles,
final int poolSize,
final int compactMinFiles,
final int compactPercentage,
final int journalFileOpenTimeout,
final SequentialFileFactory fileFactory,
final String filePrefix,
final String fileExtension,
final int maxAIO,
final int userVersion) {
super(fileFactory.isSupportsCallbacks(), fileSize); super(fileFactory.isSupportsCallbacks(), fileSize);
@ -275,7 +304,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
this.fileFactory = fileFactory; this.fileFactory = fileFactory;
filesRepository = new JournalFilesRepository(fileFactory, this, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles, poolSize); filesRepository = new JournalFilesRepository(fileFactory, this, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles, poolSize, journalFileOpenTimeout);
this.userVersion = userVersion; this.userVersion = userVersion;
} }

View File

@ -645,6 +645,16 @@ public interface Configuration {
*/ */
int getJournalCompactPercentage(); int getJournalCompactPercentage();
/**
* @return How long to wait when opening a new Journal file before failing
*/
int getJournalFileOpenTimeout();
/**
* Sets the journal file open timeout
*/
Configuration setJournalFileOpenTimeout(int journalFileOpenTimeout);
/** /**
* Sets the percentage of live data before compacting the journal. * Sets the percentage of live data before compacting the journal.
*/ */

View File

@ -173,6 +173,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
protected int journalCompactPercentage = ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage(); protected int journalCompactPercentage = ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage();
protected int journalFileOpenTimeout = ActiveMQDefaultConfiguration.getDefaultJournalFileOpenTimeout();
protected int journalFileSize = ActiveMQDefaultConfiguration.getDefaultJournalFileSize(); protected int journalFileSize = ActiveMQDefaultConfiguration.getDefaultJournalFileSize();
protected int journalPoolFiles = ActiveMQDefaultConfiguration.getDefaultJournalPoolFiles(); protected int journalPoolFiles = ActiveMQDefaultConfiguration.getDefaultJournalPoolFiles();
@ -1145,6 +1147,17 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this; return this;
} }
@Override
public int getJournalFileOpenTimeout() {
return journalFileOpenTimeout;
}
@Override
public Configuration setJournalFileOpenTimeout(int journalFileOpenTimeout) {
this.journalFileOpenTimeout = journalFileOpenTimeout;
return this;
}
@Override @Override
public ConfigurationImpl setJournalCompactPercentage(final int percentage) { public ConfigurationImpl setJournalCompactPercentage(final int percentage) {
journalCompactPercentage = percentage; journalCompactPercentage = percentage;

View File

@ -561,6 +561,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setJournalMaxIO_NIO(journalMaxIO); config.setJournalMaxIO_NIO(journalMaxIO);
} }
config.setJournalFileOpenTimeout(getInteger(e, "journal-file-open-timeout", ActiveMQDefaultConfiguration.getDefaultJournalFileOpenTimeout(), Validators.GT_ZERO));
config.setJournalMinFiles(getInteger(e, "journal-min-files", config.getJournalMinFiles(), Validators.GT_ZERO)); config.setJournalMinFiles(getInteger(e, "journal-min-files", config.getJournalMinFiles(), Validators.GT_ZERO));
config.setJournalPoolFiles(getInteger(e, "journal-pool-files", config.getJournalPoolFiles(), Validators.MINUS_ONE_OR_GT_ZERO)); config.setJournalPoolFiles(getInteger(e, "journal-pool-files", config.getJournalPoolFiles(), Validators.MINUS_ONE_OR_GT_ZERO));

View File

@ -120,7 +120,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO()); bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
bindingsFF.setDatasync(config.isJournalDatasync()); bindingsFF.setDatasync(config.isJournalDatasync());
Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1, 0); Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), bindingsFF, "activemq-bindings", "bindings", 1, 0);
bindingsJournal = localBindings; bindingsJournal = localBindings;
originalBindingsJournal = localBindings; originalBindingsJournal = localBindings;

View File

@ -692,6 +692,14 @@
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="journal-file-open-timeout" type="xsd:int" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the length of time to wait when opening a new Journal file before timing out and failing
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="server-dump-interval" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0"> <xsd:element name="server-dump-interval" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>
@ -764,7 +772,7 @@
a list of security settings a list of security settings
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
<xsd:complexType> <xsd:complexType>
<xsd:sequence> <xsd:sequence>
<xsd:choice> <xsd:choice>
@ -862,7 +870,7 @@
</xsd:sequence> </xsd:sequence>
</xsd:complexType> </xsd:complexType>
</xsd:element> </xsd:element>
<xsd:element name="broker-plugins" maxOccurs="1" minOccurs="0"> <xsd:element name="broker-plugins" maxOccurs="1" minOccurs="0">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>
@ -886,7 +894,7 @@
</xsd:annotation> </xsd:annotation>
</xsd:attribute> </xsd:attribute>
</xsd:complexType> </xsd:complexType>
</xsd:element> </xsd:element>
</xsd:sequence> </xsd:sequence>
</xsd:complexType> </xsd:complexType>
</xsd:element> </xsd:element>

View File

@ -101,6 +101,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals(10000, conf.getJournalBufferSize_NIO()); Assert.assertEquals(10000, conf.getJournalBufferSize_NIO());
Assert.assertEquals(1000, conf.getJournalBufferTimeout_NIO()); Assert.assertEquals(1000, conf.getJournalBufferTimeout_NIO());
Assert.assertEquals(56546, conf.getJournalMaxIO_NIO()); Assert.assertEquals(56546, conf.getJournalMaxIO_NIO());
Assert.assertEquals(9876, conf.getJournalFileOpenTimeout());
Assert.assertEquals(false, conf.isJournalSyncTransactional()); Assert.assertEquals(false, conf.isJournalSyncTransactional());
Assert.assertEquals(true, conf.isJournalSyncNonTransactional()); Assert.assertEquals(true, conf.isJournalSyncNonTransactional());

View File

@ -232,6 +232,7 @@
<journal-compact-percentage>33</journal-compact-percentage> <journal-compact-percentage>33</journal-compact-percentage>
<journal-compact-min-files>123</journal-compact-min-files> <journal-compact-min-files>123</journal-compact-min-files>
<journal-max-io>56546</journal-max-io> <journal-max-io>56546</journal-max-io>
<journal-file-open-timeout>9876</journal-file-open-timeout>
<server-dump-interval>5000</server-dump-interval> <server-dump-interval>5000</server-dump-interval>
<memory-warning-threshold>95</memory-warning-threshold> <memory-warning-threshold>95</memory-warning-threshold>
<memory-measure-interval>54321</memory-measure-interval> <memory-measure-interval>54321</memory-measure-interval>