This closes #1962
This commit is contained in:
commit
586739ff46
|
@ -296,6 +296,9 @@ public class Create extends InputAbstract {
|
|||
@Option(name = "--jdbc-lock-expiration", description = "Lock expiration")
|
||||
long jdbcLockExpiration = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis();
|
||||
|
||||
@Option(name = "--jdbc-max-allowed-millis-from-db-time", description = "Db time allowed difference")
|
||||
long jdbcMaxAllowedMillisFromDbTime = ActiveMQDefaultConfiguration.getDefaultJdbcMaxAllowedMillisFromDbTime();
|
||||
|
||||
private boolean IS_WINDOWS;
|
||||
private boolean IS_CYGWIN;
|
||||
|
||||
|
@ -622,6 +625,7 @@ public class Create extends InputAbstract {
|
|||
filters.put("${jdbcNetworkTimeout}", "" + jdbcNetworkTimeout);
|
||||
filters.put("${jdbcLockRenewPeriod}", "" + jdbcLockRenewPeriod);
|
||||
filters.put("${jdbcLockExpiration}", "" + jdbcLockExpiration);
|
||||
filters.put("${jdbcMaxAllowedMillisFromDbTime}", "" + jdbcMaxAllowedMillisFromDbTime);
|
||||
filters.put("${jdbc}", readTextFile(ETC_DATABASE_STORE_TXT, filters));
|
||||
} else {
|
||||
filters.put("${jdbc}", "");
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
<page-store-table-name>${jdbcPageStore}</page-store-table-name>
|
||||
<jdbc-lock-expiration>${jdbcLockExpiration}</jdbc-lock-expiration>
|
||||
<jdbc-lock-renew-period>${jdbcLockRenewPeriod}</jdbc-lock-renew-period>
|
||||
<jdbc-max-allowed-millis-from-db-time>${jdbcMaxAllowedMillisFromDbTime}</jdbc-max-allowed-millis-from-db-time>
|
||||
<jdbc-network-timeout>${jdbcNetworkTimeout}</jdbc-network-timeout>
|
||||
</database-store>
|
||||
</store>
|
||||
|
|
|
@ -451,6 +451,8 @@ public final class ActiveMQDefaultConfiguration {
|
|||
|
||||
private static final long DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = -1;
|
||||
|
||||
private static final long DEFAULT_JDBC_MAX_ALLOWED_MILLIS_FROM_DB_TIME = TimeUnit.SECONDS.toMillis(60);
|
||||
|
||||
// Default period to wait between connection TTL checks
|
||||
public static final long DEFAULT_CONNECTION_TTL_CHECK_INTERVAL = 2000;
|
||||
|
||||
|
@ -1256,6 +1258,10 @@ public final class ActiveMQDefaultConfiguration {
|
|||
return DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS;
|
||||
}
|
||||
|
||||
public static long getDefaultJdbcMaxAllowedMillisFromDbTime() {
|
||||
return DEFAULT_JDBC_MAX_ALLOWED_MILLIS_FROM_DB_TIME;
|
||||
}
|
||||
|
||||
public static long getDefaultConnectionTtlCheckInterval() {
|
||||
return DEFAULT_CONNECTION_TTL_CHECK_INTERVAL;
|
||||
}
|
||||
|
|
|
@ -50,6 +50,8 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
|
|||
|
||||
private long jdbcLockAcquisitionTimeoutMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis();
|
||||
|
||||
private long jdbcMaxAllowedMillisFromDbTime = ActiveMQDefaultConfiguration.getDefaultJdbcMaxAllowedMillisFromDbTime();
|
||||
|
||||
private long jdbcJournalSyncPeriodMillis = ActiveMQDefaultConfiguration.getDefaultJdbcJournalSyncPeriodMillis();
|
||||
|
||||
@Override
|
||||
|
@ -185,4 +187,12 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
|
|||
public void setJdbcJournalSyncPeriodMillis(long jdbcJournalSyncPeriodMillis) {
|
||||
this.jdbcJournalSyncPeriodMillis = jdbcJournalSyncPeriodMillis;
|
||||
}
|
||||
|
||||
public long getJdbcMaxAllowedMillisFromDbTime() {
|
||||
return jdbcMaxAllowedMillisFromDbTime;
|
||||
}
|
||||
|
||||
public void setJdbcMaxAllowedMillisFromDbTime(long jdbcMaxAllowedMillisFromDbTime) {
|
||||
this.jdbcMaxAllowedMillisFromDbTime = jdbcMaxAllowedMillisFromDbTime;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1472,6 +1472,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
conf.setJdbcNetworkTimeout(getInteger(storeNode, "jdbc-network-timeout", conf.getJdbcNetworkTimeout(), Validators.NO_CHECK));
|
||||
conf.setJdbcLockRenewPeriodMillis(getLong(storeNode, "jdbc-lock-renew-period", conf.getJdbcLockRenewPeriodMillis(), Validators.NO_CHECK));
|
||||
conf.setJdbcLockExpirationMillis(getLong(storeNode, "jdbc-lock-expiration", conf.getJdbcLockExpirationMillis(), Validators.NO_CHECK));
|
||||
conf.setJdbcMaxAllowedMillisFromDbTime(getLong(storeNode, "jdbc-max-allowed-millis-from-db-time", conf.getJdbcMaxAllowedMillisFromDbTime(), Validators.NO_CHECK));
|
||||
conf.setJdbcJournalSyncPeriodMillis(getLong(storeNode, "jdbc-journal-sync-period", conf.getJdbcJournalSyncPeriodMillis(), Validators.NO_CHECK));
|
||||
return conf;
|
||||
}
|
||||
|
|
|
@ -35,14 +35,12 @@ final class JdbcLeaseLock implements LeaseLock {
|
|||
private static final Logger LOGGER = Logger.getLogger(JdbcLeaseLock.class);
|
||||
private static final int MAX_HOLDER_ID_LENGTH = 128;
|
||||
private final Connection connection;
|
||||
private final long maxAllowableMillisDiffFromDBTime;
|
||||
private long millisDiffFromCurrentTime;
|
||||
private long millisDiffFromDbTime;
|
||||
private final String holderId;
|
||||
private final PreparedStatement tryAcquireLock;
|
||||
private final PreparedStatement tryReleaseLock;
|
||||
private final PreparedStatement renewLock;
|
||||
private final PreparedStatement isLocked;
|
||||
private final PreparedStatement currentDateTime;
|
||||
private final long expirationMillis;
|
||||
private boolean maybeAcquired;
|
||||
|
||||
|
@ -56,20 +54,17 @@ final class JdbcLeaseLock implements LeaseLock {
|
|||
PreparedStatement tryReleaseLock,
|
||||
PreparedStatement renewLock,
|
||||
PreparedStatement isLocked,
|
||||
PreparedStatement currentDateTime,
|
||||
long expirationMIllis,
|
||||
long maxAllowableMillisDiffFromDBTime) {
|
||||
long millisDiffFromDbTime) {
|
||||
if (holderId.length() > MAX_HOLDER_ID_LENGTH) {
|
||||
throw new IllegalArgumentException("holderId length must be <=" + MAX_HOLDER_ID_LENGTH);
|
||||
}
|
||||
this.holderId = holderId;
|
||||
this.maxAllowableMillisDiffFromDBTime = maxAllowableMillisDiffFromDBTime;
|
||||
this.millisDiffFromCurrentTime = Long.MAX_VALUE;
|
||||
this.millisDiffFromDbTime = millisDiffFromDbTime;
|
||||
this.tryAcquireLock = tryAcquireLock;
|
||||
this.tryReleaseLock = tryReleaseLock;
|
||||
this.renewLock = renewLock;
|
||||
this.isLocked = isLocked;
|
||||
this.currentDateTime = currentDateTime;
|
||||
this.expirationMillis = expirationMIllis;
|
||||
this.maybeAcquired = false;
|
||||
this.connection = connection;
|
||||
|
@ -84,31 +79,8 @@ final class JdbcLeaseLock implements LeaseLock {
|
|||
return expirationMillis;
|
||||
}
|
||||
|
||||
private long timeDifference() throws SQLException {
|
||||
if (Long.MAX_VALUE == millisDiffFromCurrentTime) {
|
||||
if (maxAllowableMillisDiffFromDBTime > 0) {
|
||||
millisDiffFromCurrentTime = determineTimeDifference();
|
||||
} else {
|
||||
millisDiffFromCurrentTime = 0L;
|
||||
}
|
||||
}
|
||||
return millisDiffFromCurrentTime;
|
||||
}
|
||||
|
||||
private long determineTimeDifference() throws SQLException {
|
||||
try (ResultSet resultSet = currentDateTime.executeQuery()) {
|
||||
long result = 0L;
|
||||
if (resultSet.next()) {
|
||||
final Timestamp timestamp = resultSet.getTimestamp(1);
|
||||
final long diff = System.currentTimeMillis() - timestamp.getTime();
|
||||
if (Math.abs(diff) > maxAllowableMillisDiffFromDBTime) {
|
||||
// off by more than maxAllowableMillisDiffFromDBTime so lets adjust
|
||||
result = (-diff);
|
||||
}
|
||||
LOGGER.info(holderId() + " diff adjust from db: " + result + ", db time: " + timestamp);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
private long timeDifference() {
|
||||
return millisDiffFromDbTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -162,6 +134,9 @@ final class JdbcLeaseLock implements LeaseLock {
|
|||
connection.setAutoCommit(true);
|
||||
if (acquired) {
|
||||
this.maybeAcquired = true;
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug(holderId + " has acquired a lock");
|
||||
}
|
||||
}
|
||||
return acquired;
|
||||
} catch (SQLException e) {
|
||||
|
@ -202,7 +177,9 @@ final class JdbcLeaseLock implements LeaseLock {
|
|||
final long expiredBy = now - lockExpirationTime;
|
||||
if (expiredBy > 0) {
|
||||
result = false;
|
||||
LOGGER.warn("found zombie lock with holderId: " + currentHolderId + " expired by: " + expiredBy + " ms");
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("found zombie lock with holderId: " + currentHolderId + " expired by: " + expiredBy + " ms");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -232,7 +209,9 @@ final class JdbcLeaseLock implements LeaseLock {
|
|||
if (preparedStatement.executeUpdate() != 1) {
|
||||
LOGGER.warn(holderId + " has failed to release a lock");
|
||||
} else {
|
||||
LOGGER.info(holderId + " has released a lock");
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug(holderId + " has released a lock");
|
||||
}
|
||||
}
|
||||
//consider it as released to avoid on finalize to be reclaimed
|
||||
this.maybeAcquired = false;
|
||||
|
@ -263,7 +242,6 @@ final class JdbcLeaseLock implements LeaseLock {
|
|||
this.tryAcquireLock.close();
|
||||
this.renewLock.close();
|
||||
this.isLocked.close();
|
||||
this.currentDateTime.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,11 +68,30 @@ public final class JdbcNodeManager extends NodeManager {
|
|||
sqlProviderFactory = new PropertySQLProvider.Factory(configuration.getDataSource());
|
||||
}
|
||||
final String brokerId = java.util.UUID.randomUUID().toString();
|
||||
return usingDataSource(brokerId, configuration.getJdbcLockExpirationMillis(), configuration.getJdbcLockRenewPeriodMillis(), configuration.getJdbcLockAcquisitionTimeoutMillis(), configuration.getDataSource(), sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER), scheduledExecutorService, executorFactory, ioCriticalErrorListener);
|
||||
return usingDataSource(brokerId,
|
||||
configuration.getJdbcLockExpirationMillis(),
|
||||
configuration.getJdbcLockRenewPeriodMillis(),
|
||||
configuration.getJdbcLockAcquisitionTimeoutMillis(),
|
||||
configuration.getJdbcMaxAllowedMillisFromDbTime(),
|
||||
configuration.getDataSource(),
|
||||
sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER),
|
||||
scheduledExecutorService,
|
||||
executorFactory,
|
||||
ioCriticalErrorListener);
|
||||
} else {
|
||||
final SQLProvider sqlProvider = JDBCUtils.getSQLProvider(configuration.getJdbcDriverClassName(), configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
|
||||
final String brokerId = java.util.UUID.randomUUID().toString();
|
||||
return usingConnectionUrl(brokerId, configuration.getJdbcLockExpirationMillis(), configuration.getJdbcLockRenewPeriodMillis(), configuration.getJdbcLockAcquisitionTimeoutMillis(), configuration.getJdbcConnectionUrl(), configuration.getJdbcDriverClassName(), sqlProvider, scheduledExecutorService, executorFactory, ioCriticalErrorListener);
|
||||
return usingConnectionUrl(brokerId,
|
||||
configuration.getJdbcLockExpirationMillis(),
|
||||
configuration.getJdbcLockRenewPeriodMillis(),
|
||||
configuration.getJdbcLockAcquisitionTimeoutMillis(),
|
||||
configuration.getJdbcMaxAllowedMillisFromDbTime(),
|
||||
configuration.getJdbcConnectionUrl(),
|
||||
configuration.getJdbcDriverClassName(),
|
||||
sqlProvider,
|
||||
scheduledExecutorService,
|
||||
executorFactory,
|
||||
ioCriticalErrorListener);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -80,13 +99,14 @@ public final class JdbcNodeManager extends NodeManager {
|
|||
long lockExpirationMillis,
|
||||
long lockRenewPeriodMillis,
|
||||
long lockAcquisitionTimeoutMillis,
|
||||
long maxAllowedMillisFromDbTime,
|
||||
DataSource dataSource,
|
||||
SQLProvider provider,
|
||||
ScheduledExecutorService scheduledExecutorService,
|
||||
ExecutorFactory executorFactory,
|
||||
IOCriticalErrorListener ioCriticalErrorListener) {
|
||||
return new JdbcNodeManager(
|
||||
() -> JdbcSharedStateManager.usingDataSource(brokerId, lockExpirationMillis, dataSource, provider),
|
||||
() -> JdbcSharedStateManager.usingDataSource(brokerId, lockExpirationMillis, maxAllowedMillisFromDbTime, dataSource, provider),
|
||||
false,
|
||||
lockRenewPeriodMillis,
|
||||
lockAcquisitionTimeoutMillis,
|
||||
|
@ -99,6 +119,7 @@ public final class JdbcNodeManager extends NodeManager {
|
|||
long lockExpirationMillis,
|
||||
long lockRenewPeriodMillis,
|
||||
long lockAcquisitionTimeoutMillis,
|
||||
long maxAllowedMillisFromDbTime,
|
||||
String jdbcUrl,
|
||||
String driverClass,
|
||||
SQLProvider provider,
|
||||
|
@ -106,7 +127,7 @@ public final class JdbcNodeManager extends NodeManager {
|
|||
ExecutorFactory executorFactory,
|
||||
IOCriticalErrorListener ioCriticalErrorListener) {
|
||||
return new JdbcNodeManager(
|
||||
() -> JdbcSharedStateManager.usingConnectionUrl(brokerId, lockExpirationMillis, jdbcUrl, driverClass, provider),
|
||||
() -> JdbcSharedStateManager.usingConnectionUrl(brokerId, lockExpirationMillis, maxAllowedMillisFromDbTime, jdbcUrl, driverClass, provider),
|
||||
false,
|
||||
lockRenewPeriodMillis,
|
||||
lockAcquisitionTimeoutMillis,
|
||||
|
|
|
@ -22,6 +22,8 @@ import java.sql.Connection;
|
|||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.sql.Timestamp;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
|
||||
|
@ -39,6 +41,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
|||
public static final int MAX_SETUP_ATTEMPTS = 20;
|
||||
private final String holderId;
|
||||
private final long lockExpirationMillis;
|
||||
private final long maxAllowedMillisFromDbTime;
|
||||
private JdbcLeaseLock liveLock;
|
||||
private JdbcLeaseLock backupLock;
|
||||
private PreparedStatement readNodeId;
|
||||
|
@ -46,12 +49,14 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
|||
private PreparedStatement initializeNodeId;
|
||||
private PreparedStatement readState;
|
||||
private PreparedStatement writeState;
|
||||
private long timeDifferenceMillisFromDb = 0;
|
||||
|
||||
public static JdbcSharedStateManager usingDataSource(String holderId,
|
||||
long locksExpirationMillis,
|
||||
long maxAllowedMillisFromDbTime,
|
||||
DataSource dataSource,
|
||||
SQLProvider provider) {
|
||||
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
|
||||
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis, maxAllowedMillisFromDbTime);
|
||||
sharedStateManager.setDataSource(dataSource);
|
||||
sharedStateManager.setSqlProvider(provider);
|
||||
try {
|
||||
|
@ -64,10 +69,11 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
|||
|
||||
public static JdbcSharedStateManager usingConnectionUrl(String holderId,
|
||||
long locksExpirationMillis,
|
||||
long maxAllowedMillisFromDbTime,
|
||||
String jdbcConnectionUrl,
|
||||
String jdbcDriverClass,
|
||||
SQLProvider provider) {
|
||||
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
|
||||
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis, maxAllowedMillisFromDbTime);
|
||||
sharedStateManager.setJdbcConnectionUrl(jdbcConnectionUrl);
|
||||
sharedStateManager.setJdbcDriverClass(jdbcDriverClass);
|
||||
sharedStateManager.setSqlProvider(provider);
|
||||
|
@ -91,26 +97,52 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* It computes the distance in milliseconds of {@link System#currentTimeMillis()} from the DBMS time.<br>
|
||||
* It must be added to {@link System#currentTimeMillis()} in order to approximate the DBMS time.
|
||||
* It will create a transaction by its own.
|
||||
*/
|
||||
static long timeDifferenceMillisFromDb(Connection connection, SQLProvider sqlProvider) throws SQLException {
|
||||
try (Statement statement = connection.createStatement()) {
|
||||
connection.setAutoCommit(false);
|
||||
final long result;
|
||||
try (ResultSet resultSet = statement.executeQuery(sqlProvider.currentTimestampSQL())) {
|
||||
resultSet.next();
|
||||
final Timestamp timestamp = resultSet.getTimestamp(1);
|
||||
final long systemNow = System.currentTimeMillis();
|
||||
result = timestamp.getTime() - systemNow;
|
||||
} catch (SQLException ie) {
|
||||
connection.rollback();
|
||||
connection.setAutoCommit(true);
|
||||
throw ie;
|
||||
}
|
||||
connection.commit();
|
||||
connection.setAutoCommit(true);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
static JdbcLeaseLock createLiveLock(String holderId,
|
||||
Connection connection,
|
||||
SQLProvider sqlProvider,
|
||||
long expirationMillis,
|
||||
long maxAllowableMillisDiffFromDBtime) throws SQLException {
|
||||
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, maxAllowableMillisDiffFromDBtime);
|
||||
long timeDifferenceMillisFromDb) throws SQLException {
|
||||
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), expirationMillis, timeDifferenceMillisFromDb);
|
||||
}
|
||||
|
||||
static JdbcLeaseLock createBackupLock(String holderId,
|
||||
Connection connection,
|
||||
SQLProvider sqlProvider,
|
||||
long expirationMillis,
|
||||
long maxAllowableMillisDiffFromDBtime) throws SQLException {
|
||||
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, maxAllowableMillisDiffFromDBtime);
|
||||
long timeDifferenceMillisFromDb) throws SQLException {
|
||||
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), expirationMillis, timeDifferenceMillisFromDb);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void prepareStatements() throws SQLException {
|
||||
this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, 0);
|
||||
this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, 0);
|
||||
final long timeDifferenceMillisFromDb = validateTimeDifferenceMillisFromDb();
|
||||
this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, timeDifferenceMillisFromDb);
|
||||
this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, timeDifferenceMillisFromDb);
|
||||
this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL());
|
||||
this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL());
|
||||
this.initializeNodeId = connection.prepareStatement(sqlProvider.initializeNodeIdSQL());
|
||||
|
@ -118,9 +150,32 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
|||
this.readState = connection.prepareStatement(sqlProvider.readStateSQL());
|
||||
}
|
||||
|
||||
private JdbcSharedStateManager(String holderId, long lockExpirationMillis) {
|
||||
/**
|
||||
* It will be populated only after a {@link #start()}.
|
||||
*/
|
||||
long timeDifferenceMillisFromDb() {
|
||||
return timeDifferenceMillisFromDb;
|
||||
}
|
||||
|
||||
private long validateTimeDifferenceMillisFromDb() throws SQLException {
|
||||
final long timeDifferenceMillisFromDb = timeDifferenceMillisFromDb(connection, sqlProvider);
|
||||
this.timeDifferenceMillisFromDb = timeDifferenceMillisFromDb;
|
||||
final long absoluteTimeDifference = Math.abs(timeDifferenceMillisFromDb);
|
||||
if (absoluteTimeDifference > maxAllowedMillisFromDbTime) {
|
||||
throw new IllegalStateException("The system is far " + (-timeDifferenceMillisFromDb) + " milliseconds from DB time, exceeding maxAllowedMillisFromDbTime = " + maxAllowedMillisFromDbTime);
|
||||
}
|
||||
if (absoluteTimeDifference > 0) {
|
||||
final String msg = "The system is far " + timeDifferenceMillisFromDb + " milliseconds from DB time";
|
||||
final Logger.Level logLevel = absoluteTimeDifference > lockExpirationMillis ? Logger.Level.WARN : Logger.Level.DEBUG;
|
||||
logger.log(logLevel, msg);
|
||||
}
|
||||
return timeDifferenceMillisFromDb;
|
||||
}
|
||||
|
||||
private JdbcSharedStateManager(String holderId, long lockExpirationMillis, long maxAllowedMillisFromDbTime) {
|
||||
this.holderId = holderId;
|
||||
this.lockExpirationMillis = lockExpirationMillis;
|
||||
this.maxAllowedMillisFromDbTime = maxAllowedMillisFromDbTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1963,6 +1963,13 @@
|
|||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
<xsd:element name="jdbc-max-allowed-millis-from-db-time" type="xsd:int" minOccurs="0" maxOccurs="1">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
The absolute time in milliseconds the system clock is allowed to be distant from the DB time
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
</xsd:all>
|
||||
<xsd:attributeGroup ref="xml:specialAttrs"/>
|
||||
</xsd:complexType>
|
||||
|
|
|
@ -72,7 +72,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
|
|||
jdbcSharedStateManager.getConnection(),
|
||||
sqlProvider,
|
||||
acquireMillis,
|
||||
0);
|
||||
jdbcSharedStateManager.timeDifferenceMillisFromDb());
|
||||
} catch (SQLException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
|
@ -100,6 +100,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
|
|||
.usingConnectionUrl(
|
||||
UUID.randomUUID().toString(),
|
||||
dbConf.getJdbcLockExpirationMillis(),
|
||||
dbConf.getJdbcMaxAllowedMillisFromDbTime(),
|
||||
dbConf.getJdbcConnectionUrl(),
|
||||
dbConf.getJdbcDriverClassName(),
|
||||
sqlProvider);
|
||||
|
|
|
@ -479,6 +479,7 @@ public abstract class ActiveMQTestBase extends Assert {
|
|||
dbStorageConfiguration.setJdbcLockAcquisitionTimeoutMillis(getJdbcLockAcquisitionTimeoutMillis());
|
||||
dbStorageConfiguration.setJdbcLockExpirationMillis(getJdbcLockExpirationMillis());
|
||||
dbStorageConfiguration.setJdbcLockRenewPeriodMillis(getJdbcLockRenewPeriodMillis());
|
||||
dbStorageConfiguration.setJdbcMaxAllowedMillisFromDbTime(getJdbcMaxAllowedMillisFromDbTime());
|
||||
return dbStorageConfiguration;
|
||||
}
|
||||
|
||||
|
@ -494,6 +495,10 @@ public abstract class ActiveMQTestBase extends Assert {
|
|||
return Long.getLong("jdbc.lock.renew", ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis());
|
||||
}
|
||||
|
||||
protected long getJdbcMaxAllowedMillisFromDbTime() {
|
||||
return Long.getLong("jdbc.max.diff.db", ActiveMQDefaultConfiguration.getDefaultJdbcMaxAllowedMillisFromDbTime());
|
||||
}
|
||||
|
||||
public void destroyTables(List<String> tableNames) throws Exception {
|
||||
Driver driver = getDriver(getJDBCClassName());
|
||||
Connection connection = driver.connect(getTestJDBCConnectionUrl(), null);
|
||||
|
|
|
@ -461,6 +461,10 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a
|
|||
The time in milliseconds a JDBC lock is considered valid without keeping it alive. The default value
|
||||
is 20000 milliseconds (ie 20 seconds).
|
||||
|
||||
- `jdbc-max-allowed-millis-from-db-time`
|
||||
|
||||
The absolute time in milliseconds the system clock is allowed to be distant from the DB time, otherwise a critical error will be raised. The default value is 60000 milliseconds (ie 60 seconds).
|
||||
|
||||
Note that some DBMS (e.g. Oracle, 30 chars) have restrictions on the size of table names, this should be taken into consideration when configuring table names for the Artemis database store, pay particular attention to the page store table name, which can be appended with a unique ID of up to 20 characters. (for Oracle this would mean configuring a page-store-table-name of max size of 10 chars).
|
||||
|
||||
## Zero Persistence
|
||||
|
|
Loading…
Reference in New Issue