ARTEMIS-2466 PageSyncTimer::timeSync isn't configurable using ASYNCIO
Add the config parameter `page-sync-timeout` to set a customized value, because if the broker is configured to use ASYNCIO journal, the timeout has the same value of NIO default journal buffer timeout ie 3333333.
This commit is contained in:
parent
c8a165eeb6
commit
f680d9f712
|
@ -109,6 +109,7 @@ public class Create extends InputAbstract {
|
|||
|
||||
public static final String ETC_GLOBAL_MAX_SPECIFIED_TXT = "etc/global-max-specified.txt";
|
||||
public static final String ETC_GLOBAL_MAX_DEFAULT_TXT = "etc/global-max-default.txt";
|
||||
public static final String ETC_PAGE_SYNC_SETTINGS = "etc/page-sync-settings.txt";
|
||||
public static final String ETC_JOLOKIA_ACCESS_XML = "jolokia-access.xml";
|
||||
|
||||
@Arguments(description = "The instance directory to hold the broker's configuration and data. Path must be writable.", required = true)
|
||||
|
@ -1002,6 +1003,7 @@ public class Create extends InputAbstract {
|
|||
private void performAutoTune(HashMap<String, String> filters, JournalType journalType, File dataFolder) {
|
||||
if (noAutoTune) {
|
||||
filters.put("${journal-buffer.settings}", "");
|
||||
filters.put("${page-sync.settings}", "");
|
||||
} else {
|
||||
try {
|
||||
int writes = 250;
|
||||
|
@ -1018,6 +1020,7 @@ public class Create extends InputAbstract {
|
|||
|
||||
filters.put("${journal-buffer.settings}", readTextFile(ETC_JOURNAL_BUFFER_SETTINGS, syncFilter));
|
||||
|
||||
filters.put("${page-sync.settings}", readTextFile(ETC_PAGE_SYNC_SETTINGS, syncFilter));
|
||||
} else {
|
||||
long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, false, "journal-test.tmp", ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio(), journalType);
|
||||
long nanoseconds = SyncCalculation.toNanos(time, writes, verbose);
|
||||
|
@ -1034,11 +1037,23 @@ public class Create extends InputAbstract {
|
|||
" writes per millisecond, your journal-buffer-timeout will be " + nanoseconds);
|
||||
|
||||
filters.put("${journal-buffer.settings}", readTextFile(ETC_JOURNAL_BUFFER_SETTINGS, syncFilter));
|
||||
|
||||
if (noJournalSync) {
|
||||
syncFilter.put("${nanoseconds}", "0");
|
||||
} else if (journalType != JournalType.NIO) {
|
||||
long nioTime = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, false, "journal-test.tmp", ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio(), JournalType.NIO);
|
||||
long nioNanoseconds = SyncCalculation.toNanos(nioTime, writes, verbose);
|
||||
syncFilter.put("${nanoseconds}", Long.toString(nioNanoseconds));
|
||||
}
|
||||
|
||||
filters.put("${page-sync.settings}", readTextFile(ETC_PAGE_SYNC_SETTINGS, syncFilter));
|
||||
}
|
||||
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
filters.put("${journal-buffer.settings}", "");
|
||||
filters.put("${page-sync.settings}", "");
|
||||
e.printStackTrace();
|
||||
System.err.println("Couldn't perform sync calculation, using default values");
|
||||
}
|
||||
|
|
|
@ -73,7 +73,9 @@ ${jdbc}
|
|||
|
||||
<critical-analyzer-policy>HALT</critical-analyzer-policy>
|
||||
|
||||
${global-max-section}
|
||||
${page-sync.settings}
|
||||
|
||||
${global-max-section}
|
||||
<acceptors>
|
||||
|
||||
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
|
||||
<page-sync-timeout>${nanoseconds}</page-sync-timeout>
|
|
@ -884,6 +884,27 @@ public class ArtemisTest extends CliTestBase {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoTune() throws Exception {
|
||||
File instanceFolder = temporaryFolder.newFolder("autoTuneTest");
|
||||
|
||||
setupAuth(instanceFolder);
|
||||
|
||||
// This is usually set when run from the command line via artemis.profile
|
||||
Run.setEmbedded(true);
|
||||
Artemis.main("create", instanceFolder.getAbsolutePath(), "--force", "--silent", "--no-web", "--require-login");
|
||||
System.setProperty("artemis.instance", instanceFolder.getAbsolutePath());
|
||||
|
||||
FileConfiguration fc = new FileConfiguration();
|
||||
FileDeploymentManager deploymentManager = new FileDeploymentManager(new File(instanceFolder, "./etc/broker.xml").toURI().toString());
|
||||
deploymentManager.addDeployable(fc);
|
||||
|
||||
fc.setPageSyncTimeout(-1);
|
||||
deploymentManager.readConfiguration();
|
||||
|
||||
Assert.assertNotEquals(-1, fc.getPageSyncTimeout());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQstat() throws Exception {
|
||||
|
||||
|
|
|
@ -1193,6 +1193,18 @@ public interface Configuration {
|
|||
|
||||
String getInternalNamingPrefix();
|
||||
|
||||
/**
|
||||
* Returns the timeout (in nanoseconds) used to sync pages.
|
||||
* <br>
|
||||
* Default value is {@link org.apache.activemq.artemis.ArtemisConstants#DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO}.
|
||||
*/
|
||||
int getPageSyncTimeout();
|
||||
|
||||
/**
|
||||
* Sets the timeout (in nanoseconds) used to sync pages.
|
||||
*/
|
||||
Configuration setPageSyncTimeout(int pageSyncTimeout);
|
||||
|
||||
/**
|
||||
* @param plugins
|
||||
*/
|
||||
|
|
|
@ -338,6 +338,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
|||
|
||||
private long criticalAnalyzerCheckPeriod = 0; // non set
|
||||
|
||||
private int pageSyncTimeout = ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutNio();
|
||||
|
||||
/**
|
||||
* Parent folder for all data folders.
|
||||
*/
|
||||
|
@ -2348,6 +2350,17 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPageSyncTimeout() {
|
||||
return pageSyncTimeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigurationImpl setPageSyncTimeout(final int pageSyncTimeout) {
|
||||
this.pageSyncTimeout = pageSyncTimeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public static boolean checkoutDupCacheSize(final int windowSize, final int idCacheSize) {
|
||||
final int msgNumInFlight = windowSize / DEFAULT_JMS_MESSAGE_SIZE;
|
||||
|
||||
|
|
|
@ -677,6 +677,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
config.setCriticalAnalyzerPolicy(CriticalAnalyzerPolicy.valueOf(getString(e, "critical-analyzer-policy", config.getCriticalAnalyzerPolicy().name(), Validators.NOT_NULL_OR_EMPTY)));
|
||||
|
||||
config.setPageSyncTimeout(getInteger(e, "page-sync-timeout", config.getJournalBufferTimeout_NIO(), Validators.GE_ZERO));
|
||||
|
||||
parseAddressSettings(e, config);
|
||||
|
||||
parseResourceLimits(e, config);
|
||||
|
|
|
@ -2603,9 +2603,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
protected PagingStoreFactory getPagingStoreFactory() throws Exception {
|
||||
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
|
||||
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
|
||||
return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getJournalBufferTimeout_NIO(), scheduledPool, ioExecutorFactory, false, shutdownOnCriticalIO, configuration.isReadWholePage());
|
||||
return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, false, shutdownOnCriticalIO, configuration.isReadWholePage());
|
||||
}
|
||||
return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO, configuration.isReadWholePage());
|
||||
return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO, configuration.isReadWholePage());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -888,6 +888,15 @@
|
|||
</xsd:simpleType>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="page-sync-timeout" type="xsd:int" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
The timeout (in nanoseconds) used to sync pages. The exact default value
|
||||
depend on whether the journal is ASYNCIO or NIO.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="security-settings" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
|
|
|
@ -89,6 +89,7 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
|
|||
Assert.assertEquals(conf.getJournalLocation(), conf.getNodeManagerLockLocation());
|
||||
Assert.assertNull(conf.getJournalDeviceBlockSize());
|
||||
Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultReadWholePage(), conf.isReadWholePage());
|
||||
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutNio(), conf.getPageSyncTimeout());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -277,6 +278,10 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
|
|||
s = RandomUtil.randomString();
|
||||
conf.setClusterPassword(s);
|
||||
Assert.assertEquals(s, conf.getClusterPassword());
|
||||
|
||||
i = RandomUtil.randomInt();
|
||||
conf.setPageSyncTimeout(i);
|
||||
Assert.assertEquals(i, conf.getPageSyncTimeout());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -480,6 +485,10 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
|
|||
conf.setClusterPassword(s);
|
||||
Assert.assertEquals(s, conf.getClusterPassword());
|
||||
|
||||
i = RandomUtil.randomInt();
|
||||
conf.setPageSyncTimeout(i);
|
||||
Assert.assertEquals(i, conf.getPageSyncTimeout());
|
||||
|
||||
conf.registerBrokerPlugin(new LoggingActiveMQServerPlugin());
|
||||
Assert.assertEquals("ensure one plugin registered", 1, conf.getBrokerPlugins().size());
|
||||
|
||||
|
|
|
@ -137,6 +137,8 @@ public class DefaultsFileConfigurationTest extends ConfigurationImplTest {
|
|||
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultGracefulShutdownTimeout(), conf.getGracefulShutdownTimeout());
|
||||
|
||||
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultAmqpUseCoreSubscriptionNaming(), conf.isAmqpUseCoreSubscriptionNaming());
|
||||
|
||||
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutNio(), conf.getPageSyncTimeout());
|
||||
}
|
||||
|
||||
// Protected ---------------------------------------------------------------------------------------------
|
||||
|
|
|
@ -351,6 +351,18 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
|
|||
assertEquals(expected, addressSettings.getMaxSizeBytesRejectThreshold());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParsingPageSyncTimeout() throws Exception {
|
||||
int expected = 1000;
|
||||
FileConfigurationParser parser = new FileConfigurationParser();
|
||||
|
||||
String configStr = firstPart + String.format("<page-sync-timeout>%d</page-sync-timeout>\n", expected) + lastPart;
|
||||
ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
Configuration config = parser.parseMainConfig(input);
|
||||
assertEquals(expected, config.getPageSyncTimeout());
|
||||
}
|
||||
|
||||
private static String firstPart = "<core xmlns=\"urn:activemq:core\">" + "\n" +
|
||||
"<name>ActiveMQ.main.config</name>" + "\n" +
|
||||
"<log-delegate-factory-class-name>org.apache.activemq.artemis.integration.logging.Log4jLogDelegateFactory</log-delegate-factory-class-name>" + "\n" +
|
||||
|
|
|
@ -369,6 +369,7 @@
|
|||
<network-check-URL-list>www.apache.org</network-check-URL-list>
|
||||
<network-check-ping-command>ping-four</network-check-ping-command>
|
||||
<network-check-ping6-command>ping-six</network-check-ping6-command>
|
||||
<page-sync-timeout>1000</page-sync-timeout>
|
||||
<security-settings>
|
||||
<security-setting match="a1">
|
||||
<permission type="createNonDurableQueue" roles="a1.1"/>
|
||||
|
|
|
@ -159,6 +159,7 @@ log-delegate-factory-class-name | **deprecated** the name of the factory class t
|
|||
name | node name; used in topology notifications if set. | n/a
|
||||
[password-codec](masking-passwords.md) | the name of the class (and optional configuration properties) used to decode masked passwords. Only valid when `mask-password` is `true`. | n/a
|
||||
[page-max-concurrent-io](paging.md) | The max number of concurrent reads allowed on paging. | 5
|
||||
[page-sync-timeout](paging.md#page-sync-timeout) | The time in nanoseconds a page will be synced. | 3333333 for ASYNCIO; `journal-buffer-timeout` for NIO
|
||||
[read-whole-page](paging.md) | If true the whole page would be read, otherwise just seek and read while getting message. | `false`
|
||||
[paging-directory](paging.md#configuration)| the directory to store paged messages in. | `data/paging`
|
||||
[persist-delivery-count-before-delivery](undelivered-messages.md#delivery-count-persistence) | True means that the delivery count is persisted before delivery. False means that this only happens after a message has been cancelled. | `false`
|
||||
|
|
|
@ -171,6 +171,13 @@ Once that limit is reached any message will be blocked. (unless the protocol
|
|||
doesn't support flow control on which case there will be an exception thrown
|
||||
and the connection for those clients dropped).
|
||||
|
||||
## Page Sync Timeout
|
||||
|
||||
The pages are synced periodically and the sync period is configured through
|
||||
`page-sync-timeout` in nanoseconds. When using NIO journal, by default has
|
||||
the same value of `journal-buffer-timeout`. When using ASYNCIO, the default
|
||||
should be `3333333`.
|
||||
|
||||
## Example
|
||||
|
||||
See the [Paging Example](examples.md#paging) which shows how to use paging with
|
||||
|
|
Loading…
Reference in New Issue