ARTEMIS-1760 JDBC HA should have configurable tolerance of DB time misalignment

(cherry picked from commit 4842ebe328)
This commit is contained in:
Francesco Nigro 2018-03-27 19:51:16 +02:00 committed by Clebert Suconic
parent 9426f7a3c8
commit 74a0b15710
10 changed files with 138 additions and 50 deletions

View File

@ -449,6 +449,8 @@ public final class ActiveMQDefaultConfiguration {
private static final long DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = -1;
private static final long DEFAULT_JDBC_MAX_ALLOWED_MILLIS_FROM_DB_TIME = TimeUnit.SECONDS.toMillis(60);
// Default JMS Bingings table name, used with Database storage type
private static final String DEFAULT_JMS_BINDINGS_TABLE_NAME = "JMS_BINDINGS";
@ -1226,6 +1228,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS;
}
public static long getDefaultJdbcMaxAllowedMillisFromDbTime() {
return DEFAULT_JDBC_MAX_ALLOWED_MILLIS_FROM_DB_TIME;
}
public static String getDefaultJMSBindingsTableName() {
return DEFAULT_JMS_BINDINGS_TABLE_NAME;
}

View File

@ -52,6 +52,8 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
private long jdbcLockAcquisitionTimeoutMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis();
private long jdbcMaxAllowedMillisFromDbTime = ActiveMQDefaultConfiguration.getDefaultJdbcMaxAllowedMillisFromDbTime();
@Override
public StoreType getStoreType() {
return StoreType.DATABASE;
@ -185,4 +187,12 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
public void setJdbcLockAcquisitionTimeoutMillis(long jdbcLockAcquisitionTimeoutMillis) {
this.jdbcLockAcquisitionTimeoutMillis = jdbcLockAcquisitionTimeoutMillis;
}
public long getJdbcMaxAllowedMillisFromDbTime() {
return jdbcMaxAllowedMillisFromDbTime;
}
public void setJdbcMaxAllowedMillisFromDbTime(long jdbcMaxAllowedMillisFromDbTime) {
this.jdbcMaxAllowedMillisFromDbTime = jdbcMaxAllowedMillisFromDbTime;
}
}

View File

@ -1166,6 +1166,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
conf.setJdbcNetworkTimeout(getInteger(storeNode, "jdbc-network-timeout", conf.getJdbcNetworkTimeout(), Validators.NO_CHECK));
conf.setJdbcLockRenewPeriodMillis(getLong(storeNode, "jdbc-lock-renew-period", conf.getJdbcLockRenewPeriodMillis(), Validators.NO_CHECK));
conf.setJdbcLockExpirationMillis(getLong(storeNode, "jdbc-lock-expiration", conf.getJdbcLockExpirationMillis(), Validators.NO_CHECK));
conf.setJdbcMaxAllowedMillisFromDbTime(getLong(storeNode, "jdbc-max-allowed-millis-from-db-time", conf.getJdbcMaxAllowedMillisFromDbTime(), Validators.NO_CHECK));
return conf;
}

View File

@ -35,14 +35,12 @@ final class JdbcLeaseLock implements LeaseLock {
private static final Logger LOGGER = Logger.getLogger(JdbcLeaseLock.class);
private static final int MAX_HOLDER_ID_LENGTH = 128;
private final Connection connection;
private final long maxAllowableMillisDiffFromDBTime;
private long millisDiffFromCurrentTime;
private long millisDiffFromDbTime;
private final String holderId;
private final PreparedStatement tryAcquireLock;
private final PreparedStatement tryReleaseLock;
private final PreparedStatement renewLock;
private final PreparedStatement isLocked;
private final PreparedStatement currentDateTime;
private final long expirationMillis;
private boolean maybeAcquired;
@ -56,20 +54,17 @@ final class JdbcLeaseLock implements LeaseLock {
PreparedStatement tryReleaseLock,
PreparedStatement renewLock,
PreparedStatement isLocked,
PreparedStatement currentDateTime,
long expirationMIllis,
long maxAllowableMillisDiffFromDBTime) {
long millisDiffFromDbTime) {
if (holderId.length() > MAX_HOLDER_ID_LENGTH) {
throw new IllegalArgumentException("holderId length must be <=" + MAX_HOLDER_ID_LENGTH);
}
this.holderId = holderId;
this.maxAllowableMillisDiffFromDBTime = maxAllowableMillisDiffFromDBTime;
this.millisDiffFromCurrentTime = Long.MAX_VALUE;
this.millisDiffFromDbTime = millisDiffFromDbTime;
this.tryAcquireLock = tryAcquireLock;
this.tryReleaseLock = tryReleaseLock;
this.renewLock = renewLock;
this.isLocked = isLocked;
this.currentDateTime = currentDateTime;
this.expirationMillis = expirationMIllis;
this.maybeAcquired = false;
this.connection = connection;
@ -84,31 +79,8 @@ final class JdbcLeaseLock implements LeaseLock {
return expirationMillis;
}
private long timeDifference() throws SQLException {
if (Long.MAX_VALUE == millisDiffFromCurrentTime) {
if (maxAllowableMillisDiffFromDBTime > 0) {
millisDiffFromCurrentTime = determineTimeDifference();
} else {
millisDiffFromCurrentTime = 0L;
}
}
return millisDiffFromCurrentTime;
}
private long determineTimeDifference() throws SQLException {
try (ResultSet resultSet = currentDateTime.executeQuery()) {
long result = 0L;
if (resultSet.next()) {
final Timestamp timestamp = resultSet.getTimestamp(1);
final long diff = System.currentTimeMillis() - timestamp.getTime();
if (Math.abs(diff) > maxAllowableMillisDiffFromDBTime) {
// off by more than maxAllowableMillisDiffFromDBTime so lets adjust
result = (-diff);
}
LOGGER.info(holderId() + " diff adjust from db: " + result + ", db time: " + timestamp);
}
return result;
}
private long timeDifference() {
return millisDiffFromDbTime;
}
@Override
@ -162,6 +134,9 @@ final class JdbcLeaseLock implements LeaseLock {
connection.setAutoCommit(true);
if (acquired) {
this.maybeAcquired = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(holderId + " has acquired a lock");
}
}
return acquired;
} catch (SQLException e) {
@ -202,7 +177,9 @@ final class JdbcLeaseLock implements LeaseLock {
final long expiredBy = now - lockExpirationTime;
if (expiredBy > 0) {
result = false;
LOGGER.warn("found zombie lock with holderId: " + currentHolderId + " expired by: " + expiredBy + " ms");
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("found zombie lock with holderId: " + currentHolderId + " expired by: " + expiredBy + " ms");
}
}
}
}
@ -232,7 +209,9 @@ final class JdbcLeaseLock implements LeaseLock {
if (preparedStatement.executeUpdate() != 1) {
LOGGER.warn(holderId + " has failed to release a lock");
} else {
LOGGER.info(holderId + " has released a lock");
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(holderId + " has released a lock");
}
}
//consider it as released to avoid on finalize to be reclaimed
this.maybeAcquired = false;
@ -263,7 +242,6 @@ final class JdbcLeaseLock implements LeaseLock {
this.tryAcquireLock.close();
this.renewLock.close();
this.isLocked.close();
this.currentDateTime.close();
}
}
}

View File

@ -68,11 +68,30 @@ public final class JdbcNodeManager extends NodeManager {
sqlProviderFactory = new PropertySQLProvider.Factory(configuration.getDataSource());
}
final String brokerId = java.util.UUID.randomUUID().toString();
return usingDataSource(brokerId, configuration.getJdbcLockExpirationMillis(), configuration.getJdbcLockRenewPeriodMillis(), configuration.getJdbcLockAcquisitionTimeoutMillis(), configuration.getDataSource(), sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER), scheduledExecutorService, executorFactory, ioCriticalErrorListener);
return usingDataSource(brokerId,
configuration.getJdbcLockExpirationMillis(),
configuration.getJdbcLockRenewPeriodMillis(),
configuration.getJdbcLockAcquisitionTimeoutMillis(),
configuration.getJdbcMaxAllowedMillisFromDbTime(),
configuration.getDataSource(),
sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER),
scheduledExecutorService,
executorFactory,
ioCriticalErrorListener);
} else {
final SQLProvider sqlProvider = JDBCUtils.getSQLProvider(configuration.getJdbcDriverClassName(), configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
final String brokerId = java.util.UUID.randomUUID().toString();
return usingConnectionUrl(brokerId, configuration.getJdbcLockExpirationMillis(), configuration.getJdbcLockRenewPeriodMillis(), configuration.getJdbcLockAcquisitionTimeoutMillis(), configuration.getJdbcConnectionUrl(), configuration.getJdbcDriverClassName(), sqlProvider, scheduledExecutorService, executorFactory, ioCriticalErrorListener);
return usingConnectionUrl(brokerId,
configuration.getJdbcLockExpirationMillis(),
configuration.getJdbcLockRenewPeriodMillis(),
configuration.getJdbcLockAcquisitionTimeoutMillis(),
configuration.getJdbcMaxAllowedMillisFromDbTime(),
configuration.getJdbcConnectionUrl(),
configuration.getJdbcDriverClassName(),
sqlProvider,
scheduledExecutorService,
executorFactory,
ioCriticalErrorListener);
}
}
@ -80,13 +99,14 @@ public final class JdbcNodeManager extends NodeManager {
long lockExpirationMillis,
long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis,
long maxAllowedMillisFromDbTime,
DataSource dataSource,
SQLProvider provider,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
return new JdbcNodeManager(
() -> JdbcSharedStateManager.usingDataSource(brokerId, lockExpirationMillis, dataSource, provider),
() -> JdbcSharedStateManager.usingDataSource(brokerId, lockExpirationMillis, maxAllowedMillisFromDbTime, dataSource, provider),
false,
lockRenewPeriodMillis,
lockAcquisitionTimeoutMillis,
@ -99,6 +119,7 @@ public final class JdbcNodeManager extends NodeManager {
long lockExpirationMillis,
long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis,
long maxAllowedMillisFromDbTime,
String jdbcUrl,
String driverClass,
SQLProvider provider,
@ -106,7 +127,7 @@ public final class JdbcNodeManager extends NodeManager {
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
return new JdbcNodeManager(
() -> JdbcSharedStateManager.usingConnectionUrl(brokerId, lockExpirationMillis, jdbcUrl, driverClass, provider),
() -> JdbcSharedStateManager.usingConnectionUrl(brokerId, lockExpirationMillis, maxAllowedMillisFromDbTime, jdbcUrl, driverClass, provider),
false,
lockRenewPeriodMillis,
lockAcquisitionTimeoutMillis,

View File

@ -22,6 +22,8 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.function.Supplier;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
@ -39,6 +41,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
public static final int MAX_SETUP_ATTEMPTS = 20;
private final String holderId;
private final long lockExpirationMillis;
private final long maxAllowedMillisFromDbTime;
private JdbcLeaseLock liveLock;
private JdbcLeaseLock backupLock;
private PreparedStatement readNodeId;
@ -46,12 +49,14 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
private PreparedStatement initializeNodeId;
private PreparedStatement readState;
private PreparedStatement writeState;
private long timeDifferenceMillisFromDb = 0;
public static JdbcSharedStateManager usingDataSource(String holderId,
long locksExpirationMillis,
long maxAllowedMillisFromDbTime,
DataSource dataSource,
SQLProvider provider) {
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis, maxAllowedMillisFromDbTime);
sharedStateManager.setDataSource(dataSource);
sharedStateManager.setSqlProvider(provider);
try {
@ -64,10 +69,11 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
public static JdbcSharedStateManager usingConnectionUrl(String holderId,
long locksExpirationMillis,
long maxAllowedMillisFromDbTime,
String jdbcConnectionUrl,
String jdbcDriverClass,
SQLProvider provider) {
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis, maxAllowedMillisFromDbTime);
sharedStateManager.setJdbcConnectionUrl(jdbcConnectionUrl);
sharedStateManager.setJdbcDriverClass(jdbcDriverClass);
sharedStateManager.setSqlProvider(provider);
@ -91,26 +97,52 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
}
}
/**
* It computes the distance in milliseconds of {@link System#currentTimeMillis()} from the DBMS time.<br>
* It must be added to {@link System#currentTimeMillis()} in order to approximate the DBMS time.
* It will create a transaction by its own.
*/
static long timeDifferenceMillisFromDb(Connection connection, SQLProvider sqlProvider) throws SQLException {
try (Statement statement = connection.createStatement()) {
connection.setAutoCommit(false);
final long result;
try (ResultSet resultSet = statement.executeQuery(sqlProvider.currentTimestampSQL())) {
resultSet.next();
final Timestamp timestamp = resultSet.getTimestamp(1);
final long systemNow = System.currentTimeMillis();
result = timestamp.getTime() - systemNow;
} catch (SQLException ie) {
connection.rollback();
connection.setAutoCommit(true);
throw ie;
}
connection.commit();
connection.setAutoCommit(true);
return result;
}
}
static JdbcLeaseLock createLiveLock(String holderId,
Connection connection,
SQLProvider sqlProvider,
long expirationMillis,
long maxAllowableMillisDiffFromDBtime) throws SQLException {
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, maxAllowableMillisDiffFromDBtime);
long timeDifferenceMillisFromDb) throws SQLException {
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), expirationMillis, timeDifferenceMillisFromDb);
}
static JdbcLeaseLock createBackupLock(String holderId,
Connection connection,
SQLProvider sqlProvider,
long expirationMillis,
long maxAllowableMillisDiffFromDBtime) throws SQLException {
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, maxAllowableMillisDiffFromDBtime);
long timeDifferenceMillisFromDb) throws SQLException {
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), expirationMillis, timeDifferenceMillisFromDb);
}
@Override
protected void prepareStatements() throws SQLException {
this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, 0);
this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, 0);
final long timeDifferenceMillisFromDb = validateTimeDifferenceMillisFromDb();
this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, timeDifferenceMillisFromDb);
this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, timeDifferenceMillisFromDb);
this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL());
this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL());
this.initializeNodeId = connection.prepareStatement(sqlProvider.initializeNodeIdSQL());
@ -118,9 +150,32 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
this.readState = connection.prepareStatement(sqlProvider.readStateSQL());
}
private JdbcSharedStateManager(String holderId, long lockExpirationMillis) {
/**
* It will be populated only after a {@link #start()}.
*/
long timeDifferenceMillisFromDb() {
return timeDifferenceMillisFromDb;
}
private long validateTimeDifferenceMillisFromDb() throws SQLException {
final long timeDifferenceMillisFromDb = timeDifferenceMillisFromDb(connection, sqlProvider);
this.timeDifferenceMillisFromDb = timeDifferenceMillisFromDb;
final long absoluteTimeDifference = Math.abs(timeDifferenceMillisFromDb);
if (absoluteTimeDifference > maxAllowedMillisFromDbTime) {
throw new IllegalStateException("The system is far " + (-timeDifferenceMillisFromDb) + " milliseconds from DB time, exceeding maxAllowedMillisFromDbTime = " + maxAllowedMillisFromDbTime);
}
if (absoluteTimeDifference > 0) {
final String msg = "The system is far " + timeDifferenceMillisFromDb + " milliseconds from DB time";
final Logger.Level logLevel = absoluteTimeDifference > lockExpirationMillis ? Logger.Level.WARN : Logger.Level.DEBUG;
logger.log(logLevel, msg);
}
return timeDifferenceMillisFromDb;
}
private JdbcSharedStateManager(String holderId, long lockExpirationMillis, long maxAllowedMillisFromDbTime) {
this.holderId = holderId;
this.lockExpirationMillis = lockExpirationMillis;
this.maxAllowedMillisFromDbTime = maxAllowedMillisFromDbTime;
}
@Override

View File

@ -1736,6 +1736,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="jdbc-max-allowed-millis-from-db-time" type="xsd:int" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The absolute time in milliseconds the system clock is allowed to be distant from the DB time
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="jms-bindings-table-name" type="xsd:string" minOccurs="1" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>

View File

@ -52,7 +52,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
jdbcSharedStateManager.getConnection(),
sqlProvider,
acquireMillis,
0);
jdbcSharedStateManager.timeDifferenceMillisFromDb());
} catch (SQLException e) {
throw new IllegalStateException(e);
}
@ -69,6 +69,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
.usingConnectionUrl(
UUID.randomUUID().toString(),
dbConf.getJdbcLockExpirationMillis(),
dbConf.getJdbcMaxAllowedMillisFromDbTime(),
dbConf.getJdbcConnectionUrl(),
dbConf.getJdbcDriverClassName(),
sqlProvider);

View File

@ -473,6 +473,7 @@ public abstract class ActiveMQTestBase extends Assert {
dbStorageConfiguration.setJdbcLockAcquisitionTimeoutMillis(getJdbcLockAcquisitionTimeoutMillis());
dbStorageConfiguration.setJdbcLockExpirationMillis(getJdbcLockExpirationMillis());
dbStorageConfiguration.setJdbcLockRenewPeriodMillis(getJdbcLockRenewPeriodMillis());
dbStorageConfiguration.setJdbcMaxAllowedMillisFromDbTime(getJdbcMaxAllowedMillisFromDbTime());
return dbStorageConfiguration;
}
@ -488,6 +489,10 @@ public abstract class ActiveMQTestBase extends Assert {
return Long.getLong("jdbc.lock.renew", ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis());
}
protected long getJdbcMaxAllowedMillisFromDbTime() {
return Long.getLong("jdbc.max.diff.db", ActiveMQDefaultConfiguration.getDefaultJdbcMaxAllowedMillisFromDbTime());
}
public void destroyTables(List<String> tableNames) throws Exception {
Driver driver = getDriver(getJDBCClassName());
Connection connection = driver.connect(getTestJDBCConnectionUrl(), null);

View File

@ -447,6 +447,10 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a
The time in milliseconds a JDBC lock is considered valid without keeping it alive. The default value
is 20000 milliseconds (ie 20 seconds).
- `jdbc-max-allowed-millis-from-db-time`
The absolute time in milliseconds the system clock is allowed to be distant from the DB time, otherwise a critical error will be raised. The default value is 60000 milliseconds (ie 60 seconds).
## Configuring Apache ActiveMQ Artemis for Zero Persistence
In some situations, zero persistence is sometimes required for a