This commit is contained in:
Clebert Suconic 2019-11-05 17:01:41 -05:00
commit 2e31e04bb4
15 changed files with 111 additions and 3 deletions

View File

@ -109,6 +109,7 @@ public class Create extends InputAbstract {
public static final String ETC_GLOBAL_MAX_SPECIFIED_TXT = "etc/global-max-specified.txt";
public static final String ETC_GLOBAL_MAX_DEFAULT_TXT = "etc/global-max-default.txt";
public static final String ETC_PAGE_SYNC_SETTINGS = "etc/page-sync-settings.txt";
public static final String ETC_JOLOKIA_ACCESS_XML = "jolokia-access.xml";
@Arguments(description = "The instance directory to hold the broker's configuration and data. Path must be writable.", required = true)
@ -1002,6 +1003,7 @@ public class Create extends InputAbstract {
private void performAutoTune(HashMap<String, String> filters, JournalType journalType, File dataFolder) {
if (noAutoTune) {
filters.put("${journal-buffer.settings}", "");
filters.put("${page-sync.settings}", "");
} else {
try {
int writes = 250;
@ -1018,6 +1020,7 @@ public class Create extends InputAbstract {
filters.put("${journal-buffer.settings}", readTextFile(ETC_JOURNAL_BUFFER_SETTINGS, syncFilter));
filters.put("${page-sync.settings}", readTextFile(ETC_PAGE_SYNC_SETTINGS, syncFilter));
} else {
long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, false, "journal-test.tmp", ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio(), journalType);
long nanoseconds = SyncCalculation.toNanos(time, writes, verbose);
@ -1034,11 +1037,23 @@ public class Create extends InputAbstract {
" writes per millisecond, your journal-buffer-timeout will be " + nanoseconds);
filters.put("${journal-buffer.settings}", readTextFile(ETC_JOURNAL_BUFFER_SETTINGS, syncFilter));
if (noJournalSync) {
syncFilter.put("${nanoseconds}", "0");
} else if (journalType != JournalType.NIO) {
long nioTime = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, false, "journal-test.tmp", ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio(), JournalType.NIO);
long nioNanoseconds = SyncCalculation.toNanos(nioTime, writes, verbose);
syncFilter.put("${nanoseconds}", Long.toString(nioNanoseconds));
}
filters.put("${page-sync.settings}", readTextFile(ETC_PAGE_SYNC_SETTINGS, syncFilter));
}
} catch (Exception e) {
filters.put("${journal-buffer.settings}", "");
filters.put("${page-sync.settings}", "");
e.printStackTrace();
System.err.println("Couldn't perform sync calculation, using default values");
}

View File

@ -73,7 +73,9 @@ ${jdbc}
<critical-analyzer-policy>HALT</critical-analyzer-policy>
${global-max-section}
${page-sync.settings}
${global-max-section}
<acceptors>
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->

View File

@ -0,0 +1,2 @@
<page-sync-timeout>${nanoseconds}</page-sync-timeout>

View File

@ -884,6 +884,27 @@ public class ArtemisTest extends CliTestBase {
}
@Test
public void testAutoTune() throws Exception {
File instanceFolder = temporaryFolder.newFolder("autoTuneTest");
setupAuth(instanceFolder);
// This is usually set when run from the command line via artemis.profile
Run.setEmbedded(true);
Artemis.main("create", instanceFolder.getAbsolutePath(), "--force", "--silent", "--no-web", "--require-login");
System.setProperty("artemis.instance", instanceFolder.getAbsolutePath());
FileConfiguration fc = new FileConfiguration();
FileDeploymentManager deploymentManager = new FileDeploymentManager(new File(instanceFolder, "./etc/broker.xml").toURI().toString());
deploymentManager.addDeployable(fc);
fc.setPageSyncTimeout(-1);
deploymentManager.readConfiguration();
Assert.assertNotEquals(-1, fc.getPageSyncTimeout());
}
@Test
public void testQstat() throws Exception {

View File

@ -1193,6 +1193,18 @@ public interface Configuration {
String getInternalNamingPrefix();
/**
* Returns the timeout (in nanoseconds) used to sync pages.
* <br>
* Default value is {@link org.apache.activemq.artemis.ArtemisConstants#DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO}.
*/
int getPageSyncTimeout();
/**
* Sets the timeout (in nanoseconds) used to sync pages.
*/
Configuration setPageSyncTimeout(int pageSyncTimeout);
/**
* @param plugins
*/

View File

@ -338,6 +338,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
private long criticalAnalyzerCheckPeriod = 0; // non set
private int pageSyncTimeout = ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutNio();
/**
* Parent folder for all data folders.
*/
@ -2348,6 +2350,17 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
@Override
public int getPageSyncTimeout() {
return pageSyncTimeout;
}
@Override
public ConfigurationImpl setPageSyncTimeout(final int pageSyncTimeout) {
this.pageSyncTimeout = pageSyncTimeout;
return this;
}
public static boolean checkoutDupCacheSize(final int windowSize, final int idCacheSize) {
final int msgNumInFlight = windowSize / DEFAULT_JMS_MESSAGE_SIZE;

View File

@ -677,6 +677,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setCriticalAnalyzerPolicy(CriticalAnalyzerPolicy.valueOf(getString(e, "critical-analyzer-policy", config.getCriticalAnalyzerPolicy().name(), Validators.NOT_NULL_OR_EMPTY)));
config.setPageSyncTimeout(getInteger(e, "page-sync-timeout", config.getJournalBufferTimeout_NIO(), Validators.GE_ZERO));
parseAddressSettings(e, config);
parseResourceLimits(e, config);

View File

@ -2603,9 +2603,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
protected PagingStoreFactory getPagingStoreFactory() throws Exception {
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getJournalBufferTimeout_NIO(), scheduledPool, ioExecutorFactory, false, shutdownOnCriticalIO, configuration.isReadWholePage());
return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, false, shutdownOnCriticalIO, configuration.isReadWholePage());
}
return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO, configuration.isReadWholePage());
return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO, configuration.isReadWholePage());
}
/**

View File

@ -888,6 +888,15 @@
</xsd:simpleType>
</xsd:element>
<xsd:element name="page-sync-timeout" type="xsd:int" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
The timeout (in nanoseconds) used to sync pages. The exact default value
depend on whether the journal is ASYNCIO or NIO.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="security-settings" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -89,6 +89,7 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
Assert.assertEquals(conf.getJournalLocation(), conf.getNodeManagerLockLocation());
Assert.assertNull(conf.getJournalDeviceBlockSize());
Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultReadWholePage(), conf.isReadWholePage());
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutNio(), conf.getPageSyncTimeout());
}
@Test
@ -277,6 +278,10 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
s = RandomUtil.randomString();
conf.setClusterPassword(s);
Assert.assertEquals(s, conf.getClusterPassword());
i = RandomUtil.randomInt();
conf.setPageSyncTimeout(i);
Assert.assertEquals(i, conf.getPageSyncTimeout());
}
}
@ -480,6 +485,10 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
conf.setClusterPassword(s);
Assert.assertEquals(s, conf.getClusterPassword());
i = RandomUtil.randomInt();
conf.setPageSyncTimeout(i);
Assert.assertEquals(i, conf.getPageSyncTimeout());
conf.registerBrokerPlugin(new LoggingActiveMQServerPlugin());
Assert.assertEquals("ensure one plugin registered", 1, conf.getBrokerPlugins().size());

View File

@ -137,6 +137,8 @@ public class DefaultsFileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultGracefulShutdownTimeout(), conf.getGracefulShutdownTimeout());
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultAmqpUseCoreSubscriptionNaming(), conf.isAmqpUseCoreSubscriptionNaming());
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutNio(), conf.getPageSyncTimeout());
}
// Protected ---------------------------------------------------------------------------------------------

View File

@ -351,6 +351,18 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
assertEquals(expected, addressSettings.getMaxSizeBytesRejectThreshold());
}
@Test
public void testParsingPageSyncTimeout() throws Exception {
int expected = 1000;
FileConfigurationParser parser = new FileConfigurationParser();
String configStr = firstPart + String.format("<page-sync-timeout>%d</page-sync-timeout>\n", expected) + lastPart;
ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
Configuration config = parser.parseMainConfig(input);
assertEquals(expected, config.getPageSyncTimeout());
}
private static String firstPart = "<core xmlns=\"urn:activemq:core\">" + "\n" +
"<name>ActiveMQ.main.config</name>" + "\n" +
"<log-delegate-factory-class-name>org.apache.activemq.artemis.integration.logging.Log4jLogDelegateFactory</log-delegate-factory-class-name>" + "\n" +

View File

@ -369,6 +369,7 @@
<network-check-URL-list>www.apache.org</network-check-URL-list>
<network-check-ping-command>ping-four</network-check-ping-command>
<network-check-ping6-command>ping-six</network-check-ping6-command>
<page-sync-timeout>1000</page-sync-timeout>
<security-settings>
<security-setting match="a1">
<permission type="createNonDurableQueue" roles="a1.1"/>

View File

@ -159,6 +159,7 @@ log-delegate-factory-class-name | **deprecated** the name of the factory class t
name | node name; used in topology notifications if set. | n/a
[password-codec](masking-passwords.md) | the name of the class (and optional configuration properties) used to decode masked passwords. Only valid when `mask-password` is `true`. | n/a
[page-max-concurrent-io](paging.md) | The max number of concurrent reads allowed on paging. | 5
[page-sync-timeout](paging.md#page-sync-timeout) | The time in nanoseconds a page will be synced. | 3333333 for ASYNCIO; `journal-buffer-timeout` for NIO
[read-whole-page](paging.md) | If true the whole page would be read, otherwise just seek and read while getting message. | `false`
[paging-directory](paging.md#configuration)| the directory to store paged messages in. | `data/paging`
[persist-delivery-count-before-delivery](undelivered-messages.md#delivery-count-persistence) | True means that the delivery count is persisted before delivery. False means that this only happens after a message has been cancelled. | `false`

View File

@ -171,6 +171,13 @@ Once that limit is reached any message will be blocked. (unless the protocol
doesn't support flow control on which case there will be an exception thrown
and the connection for those clients dropped).
## Page Sync Timeout
The pages are synced periodically and the sync period is configured through
`page-sync-timeout` in nanoseconds. When using NIO journal, by default has
the same value of `journal-buffer-timeout`. When using ASYNCIO, the default
should be `3333333`.
## Example
See the [Paging Example](examples.md#paging) which shows how to use paging with