ARTEMIS-370: Make JDBC Journal Sync period configurable
This commit is contained in:
parent
e2341e3a76
commit
f045ffbcf8
|
@ -447,6 +447,8 @@ public final class ActiveMQDefaultConfiguration {
|
||||||
|
|
||||||
private static final long DEFAULT_JDBC_LOCK_EXPIRATION_MILLIS = TimeUnit.SECONDS.toMillis(20);
|
private static final long DEFAULT_JDBC_LOCK_EXPIRATION_MILLIS = TimeUnit.SECONDS.toMillis(20);
|
||||||
|
|
||||||
|
private static final long DEFAULT_JDBC_JOURNAL_SYNC_PERIOD_MILLIS = 5;
|
||||||
|
|
||||||
private static final long DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = -1;
|
private static final long DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = -1;
|
||||||
|
|
||||||
// Default period to wait between connection TTL checks
|
// Default period to wait between connection TTL checks
|
||||||
|
@ -1242,6 +1244,10 @@ public final class ActiveMQDefaultConfiguration {
|
||||||
return DEFAULT_JDBC_LOCK_RENEW_PERIOD_MILLIS;
|
return DEFAULT_JDBC_LOCK_RENEW_PERIOD_MILLIS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static long getDefaultJdbcJournalSyncPeriodMillis() {
|
||||||
|
return DEFAULT_JDBC_JOURNAL_SYNC_PERIOD_MILLIS;
|
||||||
|
}
|
||||||
|
|
||||||
public static long getDefaultJdbcLockExpirationMillis() {
|
public static long getDefaultJdbcLockExpirationMillis() {
|
||||||
return DEFAULT_JDBC_LOCK_EXPIRATION_MILLIS;
|
return DEFAULT_JDBC_LOCK_EXPIRATION_MILLIS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,7 +56,9 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
private static final Logger logger = Logger.getLogger(JDBCJournalImpl.class);
|
private static final Logger logger = Logger.getLogger(JDBCJournalImpl.class);
|
||||||
|
|
||||||
// Sync Delay in ms
|
// Sync Delay in ms
|
||||||
private static final int SYNC_DELAY = 5;
|
//private static final int SYNC_DELAY = 5;
|
||||||
|
|
||||||
|
private long syncDelay;
|
||||||
|
|
||||||
private static int USER_VERSION = 1;
|
private static int USER_VERSION = 1;
|
||||||
|
|
||||||
|
@ -95,12 +97,14 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
String tableName,
|
String tableName,
|
||||||
ScheduledExecutorService scheduledExecutorService,
|
ScheduledExecutorService scheduledExecutorService,
|
||||||
Executor completeExecutor,
|
Executor completeExecutor,
|
||||||
IOCriticalErrorListener criticalIOErrorListener) {
|
IOCriticalErrorListener criticalIOErrorListener,
|
||||||
|
long syncDelay) {
|
||||||
super(dataSource, provider);
|
super(dataSource, provider);
|
||||||
records = new ArrayList<>();
|
records = new ArrayList<>();
|
||||||
this.scheduledExecutorService = scheduledExecutorService;
|
this.scheduledExecutorService = scheduledExecutorService;
|
||||||
this.completeExecutor = completeExecutor;
|
this.completeExecutor = completeExecutor;
|
||||||
this.criticalIOErrorListener = criticalIOErrorListener;
|
this.criticalIOErrorListener = criticalIOErrorListener;
|
||||||
|
this.syncDelay = syncDelay;
|
||||||
}
|
}
|
||||||
|
|
||||||
public JDBCJournalImpl(String jdbcUrl,
|
public JDBCJournalImpl(String jdbcUrl,
|
||||||
|
@ -108,18 +112,20 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
SQLProvider sqlProvider,
|
SQLProvider sqlProvider,
|
||||||
ScheduledExecutorService scheduledExecutorService,
|
ScheduledExecutorService scheduledExecutorService,
|
||||||
Executor completeExecutor,
|
Executor completeExecutor,
|
||||||
IOCriticalErrorListener criticalIOErrorListener) {
|
IOCriticalErrorListener criticalIOErrorListener,
|
||||||
|
long syncDelay) {
|
||||||
super(sqlProvider, jdbcUrl, jdbcDriverClass);
|
super(sqlProvider, jdbcUrl, jdbcDriverClass);
|
||||||
records = new ArrayList<>();
|
records = new ArrayList<>();
|
||||||
this.scheduledExecutorService = scheduledExecutorService;
|
this.scheduledExecutorService = scheduledExecutorService;
|
||||||
this.completeExecutor = completeExecutor;
|
this.completeExecutor = completeExecutor;
|
||||||
this.criticalIOErrorListener = criticalIOErrorListener;
|
this.criticalIOErrorListener = criticalIOErrorListener;
|
||||||
|
this.syncDelay = syncDelay;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start() throws SQLException {
|
public void start() throws SQLException {
|
||||||
super.start();
|
super.start();
|
||||||
syncTimer = new JDBCJournalSync(scheduledExecutorService, completeExecutor, SYNC_DELAY, TimeUnit.MILLISECONDS, this);
|
syncTimer = new JDBCJournalSync(scheduledExecutorService, completeExecutor, syncDelay, TimeUnit.MILLISECONDS, this);
|
||||||
started = true;
|
started = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,8 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
|
||||||
|
|
||||||
private long jdbcLockAcquisitionTimeoutMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis();
|
private long jdbcLockAcquisitionTimeoutMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis();
|
||||||
|
|
||||||
|
private long jdbcJournalSyncPeriodMillis = ActiveMQDefaultConfiguration.getDefaultJdbcJournalSyncPeriodMillis();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public StoreType getStoreType() {
|
public StoreType getStoreType() {
|
||||||
return StoreType.DATABASE;
|
return StoreType.DATABASE;
|
||||||
|
@ -175,4 +177,12 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
|
||||||
public void setJdbcLockAcquisitionTimeoutMillis(long jdbcLockAcquisitionTimeoutMillis) {
|
public void setJdbcLockAcquisitionTimeoutMillis(long jdbcLockAcquisitionTimeoutMillis) {
|
||||||
this.jdbcLockAcquisitionTimeoutMillis = jdbcLockAcquisitionTimeoutMillis;
|
this.jdbcLockAcquisitionTimeoutMillis = jdbcLockAcquisitionTimeoutMillis;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getJdbcJournalSyncPeriodMillis() {
|
||||||
|
return jdbcJournalSyncPeriodMillis;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setJdbcJournalSyncPeriodMillis(long jdbcJournalSyncPeriodMillis) {
|
||||||
|
this.jdbcJournalSyncPeriodMillis = jdbcJournalSyncPeriodMillis;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1472,6 +1472,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
||||||
conf.setJdbcNetworkTimeout(getInteger(storeNode, "jdbc-network-timeout", conf.getJdbcNetworkTimeout(), Validators.NO_CHECK));
|
conf.setJdbcNetworkTimeout(getInteger(storeNode, "jdbc-network-timeout", conf.getJdbcNetworkTimeout(), Validators.NO_CHECK));
|
||||||
conf.setJdbcLockRenewPeriodMillis(getLong(storeNode, "jdbc-lock-renew-period", conf.getJdbcLockRenewPeriodMillis(), Validators.NO_CHECK));
|
conf.setJdbcLockRenewPeriodMillis(getLong(storeNode, "jdbc-lock-renew-period", conf.getJdbcLockRenewPeriodMillis(), Validators.NO_CHECK));
|
||||||
conf.setJdbcLockExpirationMillis(getLong(storeNode, "jdbc-lock-expiration", conf.getJdbcLockExpirationMillis(), Validators.NO_CHECK));
|
conf.setJdbcLockExpirationMillis(getLong(storeNode, "jdbc-lock-expiration", conf.getJdbcLockExpirationMillis(), Validators.NO_CHECK));
|
||||||
|
conf.setJdbcJournalSyncPeriodMillis(getLong(storeNode, "jdbc-journal-sync-period", conf.getJdbcJournalSyncPeriodMillis(), Validators.NO_CHECK));
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -67,13 +67,13 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
|
||||||
if (sqlProviderFactory == null) {
|
if (sqlProviderFactory == null) {
|
||||||
sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getDataSource());
|
sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getDataSource());
|
||||||
}
|
}
|
||||||
bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
|
bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener,dbConf.getJdbcJournalSyncPeriodMillis());
|
||||||
messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
|
messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener, dbConf.getJdbcJournalSyncPeriodMillis());
|
||||||
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executorFactory.getExecutor(), criticalErrorListener);
|
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executorFactory.getExecutor(), criticalErrorListener);
|
||||||
} else {
|
} else {
|
||||||
String driverClassName = dbConf.getJdbcDriverClassName();
|
String driverClassName = dbConf.getJdbcDriverClassName();
|
||||||
bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
|
bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener, dbConf.getJdbcJournalSyncPeriodMillis());
|
||||||
messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
|
messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener, dbConf.getJdbcJournalSyncPeriodMillis());
|
||||||
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executorFactory.getExecutor(), criticalErrorListener);
|
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executorFactory.getExecutor(), criticalErrorListener);
|
||||||
}
|
}
|
||||||
final int networkTimeout = dbConf.getJdbcNetworkTimeout();
|
final int networkTimeout = dbConf.getJdbcNetworkTimeout();
|
||||||
|
|
|
@ -1956,6 +1956,13 @@
|
||||||
</xsd:documentation>
|
</xsd:documentation>
|
||||||
</xsd:annotation>
|
</xsd:annotation>
|
||||||
</xsd:element>
|
</xsd:element>
|
||||||
|
<xsd:element name="jdbc-journal-sync-period" type="xsd:string" minOccurs="0" maxOccurs="1">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
The JDBC jouranl sync period in milliseconds.
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:element>
|
||||||
</xsd:all>
|
</xsd:all>
|
||||||
<xsd:attributeGroup ref="xml:specialAttrs"/>
|
<xsd:attributeGroup ref="xml:specialAttrs"/>
|
||||||
</xsd:complexType>
|
</xsd:complexType>
|
||||||
|
|
|
@ -86,7 +86,7 @@ public class JDBCJournalTest extends ActiveMQTestBase {
|
||||||
public void onIOException(Throwable code, String message, SequentialFile file) {
|
public void onIOException(Throwable code, String message, SequentialFile file) {
|
||||||
|
|
||||||
}
|
}
|
||||||
});
|
},5);
|
||||||
journal.start();
|
journal.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue