ARTEMIS-1784 JDBC NodeManager should just use DMBS clock

It avoid using the system clock to perform the locks logic
by using the DBMS time.
It contains several improvements on the JDBC error handling
and an improved observability thanks to debug logs.
This commit is contained in:
Francesco Nigro 2018-04-04 18:37:12 +02:00
parent ae8b261c4d
commit 6e9195224c
14 changed files with 377 additions and 333 deletions

View File

@ -299,9 +299,6 @@ public class Create extends InputAbstract {
@Option(name = "--jdbc-lock-expiration", description = "Lock expiration") @Option(name = "--jdbc-lock-expiration", description = "Lock expiration")
long jdbcLockExpiration = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis(); long jdbcLockExpiration = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis();
@Option(name = "--jdbc-max-allowed-millis-from-db-time", description = "Db time allowed difference")
long jdbcMaxAllowedMillisFromDbTime = ActiveMQDefaultConfiguration.getDefaultJdbcMaxAllowedMillisFromDbTime();
private boolean IS_WINDOWS; private boolean IS_WINDOWS;
private boolean IS_CYGWIN; private boolean IS_CYGWIN;
@ -629,7 +626,6 @@ public class Create extends InputAbstract {
filters.put("${jdbcNetworkTimeout}", "" + jdbcNetworkTimeout); filters.put("${jdbcNetworkTimeout}", "" + jdbcNetworkTimeout);
filters.put("${jdbcLockRenewPeriod}", "" + jdbcLockRenewPeriod); filters.put("${jdbcLockRenewPeriod}", "" + jdbcLockRenewPeriod);
filters.put("${jdbcLockExpiration}", "" + jdbcLockExpiration); filters.put("${jdbcLockExpiration}", "" + jdbcLockExpiration);
filters.put("${jdbcMaxAllowedMillisFromDbTime}", "" + jdbcMaxAllowedMillisFromDbTime);
filters.put("${jdbc}", readTextFile(ETC_DATABASE_STORE_TXT, filters)); filters.put("${jdbc}", readTextFile(ETC_DATABASE_STORE_TXT, filters));
} else { } else {
filters.put("${jdbc}", ""); filters.put("${jdbc}", "");

View File

@ -13,7 +13,6 @@
<node-manager-store-table-name>${jdbcNodeManager}</node-manager-store-table-name> <node-manager-store-table-name>${jdbcNodeManager}</node-manager-store-table-name>
<jdbc-lock-expiration>${jdbcLockExpiration}</jdbc-lock-expiration> <jdbc-lock-expiration>${jdbcLockExpiration}</jdbc-lock-expiration>
<jdbc-lock-renew-period>${jdbcLockRenewPeriod}</jdbc-lock-renew-period> <jdbc-lock-renew-period>${jdbcLockRenewPeriod}</jdbc-lock-renew-period>
<jdbc-max-allowed-millis-from-db-time>${jdbcMaxAllowedMillisFromDbTime}</jdbc-max-allowed-millis-from-db-time>
<jdbc-network-timeout>${jdbcNetworkTimeout}</jdbc-network-timeout> <jdbc-network-timeout>${jdbcNetworkTimeout}</jdbc-network-timeout>
</database-store> </database-store>
</store> </store>

View File

@ -451,8 +451,6 @@ public final class ActiveMQDefaultConfiguration {
private static final long DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = -1; private static final long DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = -1;
private static final long DEFAULT_JDBC_MAX_ALLOWED_MILLIS_FROM_DB_TIME = TimeUnit.SECONDS.toMillis(60);
// Default period to wait between connection TTL checks // Default period to wait between connection TTL checks
public static final long DEFAULT_CONNECTION_TTL_CHECK_INTERVAL = 2000; public static final long DEFAULT_CONNECTION_TTL_CHECK_INTERVAL = 2000;
@ -1258,10 +1256,6 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS; return DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS;
} }
public static long getDefaultJdbcMaxAllowedMillisFromDbTime() {
return DEFAULT_JDBC_MAX_ALLOWED_MILLIS_FROM_DB_TIME;
}
public static long getDefaultConnectionTtlCheckInterval() { public static long getDefaultConnectionTtlCheckInterval() {
return DEFAULT_CONNECTION_TTL_CHECK_INTERVAL; return DEFAULT_CONNECTION_TTL_CHECK_INTERVAL;
} }

View File

@ -38,10 +38,10 @@ count-journal-record=SELECT COUNT(*) FROM %s
create-node-manager-store-table=CREATE TABLE %s (ID INT NOT NULL, HOLDER_ID VARCHAR(128), HOLDER_EXPIRATION_TIME TIMESTAMP, NODE_ID CHAR(36),STATE CHAR(1), PRIMARY KEY(ID)) create-node-manager-store-table=CREATE TABLE %s (ID INT NOT NULL, HOLDER_ID VARCHAR(128), HOLDER_EXPIRATION_TIME TIMESTAMP, NODE_ID CHAR(36),STATE CHAR(1), PRIMARY KEY(ID))
create-state=INSERT INTO %s (ID) VALUES (%s) create-state=INSERT INTO %s (ID) VALUES (%s)
try-acquire-lock=UPDATE %s SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP) AND ID = %s try-acquire-lock=UPDATE %s SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR (HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP AND ? > CURRENT_TIMESTAMP)) AND ID = %s
try-release-lock=UPDATE %s SET HOLDER_ID = NULL, HOLDER_EXPIRATION_TIME = NULL WHERE HOLDER_ID = ? AND ID = %s try-release-lock=UPDATE %s SET HOLDER_ID = NULL, HOLDER_EXPIRATION_TIME = NULL WHERE HOLDER_ID = ? AND ID = %s
is-locked=SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME FROM %s WHERE ID = %s is-locked=SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME, CURRENT_TIMESTAMP FROM %s WHERE ID = %s
renew-lock=UPDATE %s SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND ID = %s renew-lock=UPDATE %s SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND HOLDER_EXPIRATION_TIME IS NOT NULL AND ? > HOLDER_EXPIRATION_TIME AND ? > CURRENT_TIMESTAMP AND ID = %s
current-timestamp=SELECT CURRENT_TIMESTAMP FROM %s current-timestamp=SELECT CURRENT_TIMESTAMP FROM %s
write-state=UPDATE %s SET STATE = ? WHERE ID = %s write-state=UPDATE %s SET STATE = ? WHERE ID = %s
read-state=SELECT STATE FROM %s WHERE ID = %s read-state=SELECT STATE FROM %s WHERE ID = %s

View File

@ -50,8 +50,6 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
private long jdbcLockAcquisitionTimeoutMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis(); private long jdbcLockAcquisitionTimeoutMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis();
private long jdbcMaxAllowedMillisFromDbTime = ActiveMQDefaultConfiguration.getDefaultJdbcMaxAllowedMillisFromDbTime();
private long jdbcJournalSyncPeriodMillis = ActiveMQDefaultConfiguration.getDefaultJdbcJournalSyncPeriodMillis(); private long jdbcJournalSyncPeriodMillis = ActiveMQDefaultConfiguration.getDefaultJdbcJournalSyncPeriodMillis();
@Override @Override
@ -187,12 +185,4 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
public void setJdbcJournalSyncPeriodMillis(long jdbcJournalSyncPeriodMillis) { public void setJdbcJournalSyncPeriodMillis(long jdbcJournalSyncPeriodMillis) {
this.jdbcJournalSyncPeriodMillis = jdbcJournalSyncPeriodMillis; this.jdbcJournalSyncPeriodMillis = jdbcJournalSyncPeriodMillis;
} }
public long getJdbcMaxAllowedMillisFromDbTime() {
return jdbcMaxAllowedMillisFromDbTime;
}
public void setJdbcMaxAllowedMillisFromDbTime(long jdbcMaxAllowedMillisFromDbTime) {
this.jdbcMaxAllowedMillisFromDbTime = jdbcMaxAllowedMillisFromDbTime;
}
} }

View File

@ -1473,7 +1473,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
conf.setJdbcNetworkTimeout(getInteger(storeNode, "jdbc-network-timeout", conf.getJdbcNetworkTimeout(), Validators.NO_CHECK)); conf.setJdbcNetworkTimeout(getInteger(storeNode, "jdbc-network-timeout", conf.getJdbcNetworkTimeout(), Validators.NO_CHECK));
conf.setJdbcLockRenewPeriodMillis(getLong(storeNode, "jdbc-lock-renew-period", conf.getJdbcLockRenewPeriodMillis(), Validators.NO_CHECK)); conf.setJdbcLockRenewPeriodMillis(getLong(storeNode, "jdbc-lock-renew-period", conf.getJdbcLockRenewPeriodMillis(), Validators.NO_CHECK));
conf.setJdbcLockExpirationMillis(getLong(storeNode, "jdbc-lock-expiration", conf.getJdbcLockExpirationMillis(), Validators.NO_CHECK)); conf.setJdbcLockExpirationMillis(getLong(storeNode, "jdbc-lock-expiration", conf.getJdbcLockExpirationMillis(), Validators.NO_CHECK));
conf.setJdbcMaxAllowedMillisFromDbTime(getLong(storeNode, "jdbc-max-allowed-millis-from-db-time", conf.getJdbcMaxAllowedMillisFromDbTime(), Validators.NO_CHECK));
conf.setJdbcJournalSyncPeriodMillis(getLong(storeNode, "jdbc-journal-sync-period", conf.getJdbcJournalSyncPeriodMillis(), Validators.NO_CHECK)); conf.setJdbcJournalSyncPeriodMillis(getLong(storeNode, "jdbc-journal-sync-period", conf.getJdbcJournalSyncPeriodMillis(), Validators.NO_CHECK));
return conf; return conf;
} }

View File

@ -23,6 +23,7 @@ import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate; import java.util.function.Predicate;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -35,14 +36,15 @@ final class JdbcLeaseLock implements LeaseLock {
private static final Logger LOGGER = Logger.getLogger(JdbcLeaseLock.class); private static final Logger LOGGER = Logger.getLogger(JdbcLeaseLock.class);
private static final int MAX_HOLDER_ID_LENGTH = 128; private static final int MAX_HOLDER_ID_LENGTH = 128;
private final Connection connection; private final Connection connection;
private long millisDiffFromDbTime;
private final String holderId; private final String holderId;
private final PreparedStatement tryAcquireLock; private final PreparedStatement tryAcquireLock;
private final PreparedStatement tryReleaseLock; private final PreparedStatement tryReleaseLock;
private final PreparedStatement renewLock; private final PreparedStatement renewLock;
private final PreparedStatement isLocked; private final PreparedStatement isLocked;
private final PreparedStatement currentDateTime;
private final long expirationMillis; private final long expirationMillis;
private boolean maybeAcquired; private boolean maybeAcquired;
private final String lockName;
/** /**
* The lock will be responsible (ie {@link #close()}) of all the {@link PreparedStatement}s used by it, but not of the {@link Connection}, * The lock will be responsible (ie {@link #close()}) of all the {@link PreparedStatement}s used by it, but not of the {@link Connection},
@ -54,20 +56,22 @@ final class JdbcLeaseLock implements LeaseLock {
PreparedStatement tryReleaseLock, PreparedStatement tryReleaseLock,
PreparedStatement renewLock, PreparedStatement renewLock,
PreparedStatement isLocked, PreparedStatement isLocked,
PreparedStatement currentDateTime,
long expirationMIllis, long expirationMIllis,
long millisDiffFromDbTime) { String lockName) {
if (holderId.length() > MAX_HOLDER_ID_LENGTH) { if (holderId.length() > MAX_HOLDER_ID_LENGTH) {
throw new IllegalArgumentException("holderId length must be <=" + MAX_HOLDER_ID_LENGTH); throw new IllegalArgumentException("holderId length must be <=" + MAX_HOLDER_ID_LENGTH);
} }
this.holderId = holderId; this.holderId = holderId;
this.millisDiffFromDbTime = millisDiffFromDbTime;
this.tryAcquireLock = tryAcquireLock; this.tryAcquireLock = tryAcquireLock;
this.tryReleaseLock = tryReleaseLock; this.tryReleaseLock = tryReleaseLock;
this.renewLock = renewLock; this.renewLock = renewLock;
this.isLocked = isLocked; this.isLocked = isLocked;
this.currentDateTime = currentDateTime;
this.expirationMillis = expirationMIllis; this.expirationMillis = expirationMIllis;
this.maybeAcquired = false; this.maybeAcquired = false;
this.connection = connection; this.connection = connection;
this.lockName = lockName;
} }
public String holderId() { public String holderId() {
@ -79,32 +83,88 @@ final class JdbcLeaseLock implements LeaseLock {
return expirationMillis; return expirationMillis;
} }
private long timeDifference() { private String readableLockStatus() {
return millisDiffFromDbTime; try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
try {
final String lockStatus;
final PreparedStatement preparedStatement = this.isLocked;
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
lockStatus = null;
} else {
final String currentHolderId = resultSet.getString(1);
final Timestamp expirationTime = resultSet.getTimestamp(2);
final Timestamp currentTimestamp = resultSet.getTimestamp(3);
lockStatus = "holderId = " + currentHolderId + " expirationTime = " + expirationTime + " currentTimestamp = " + currentTimestamp;
}
}
connection.commit();
return lockStatus;
} catch (SQLException ie) {
connection.rollback();
return ie.getMessage();
} finally {
connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) {
return e.getMessage();
}
}
private long dbCurrentTimeMillis() throws SQLException {
final long start = System.nanoTime();
try (ResultSet resultSet = currentDateTime.executeQuery()) {
resultSet.next();
final Timestamp currentTimestamp = resultSet.getTimestamp(1);
final long elapsedTime = System.nanoTime() - start;
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s query currentTimestamp = %s tooks %d ms",
lockName, holderId, currentTimestamp, TimeUnit.NANOSECONDS.toMillis(elapsedTime));
}
return currentTimestamp.getTime();
}
} }
@Override @Override
public boolean renew() { public boolean renew() {
synchronized (connection) { synchronized (connection) {
try { try {
final boolean result; connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false); connection.setAutoCommit(false);
try { try {
final long timeDifference = timeDifference();
final PreparedStatement preparedStatement = this.renewLock; final PreparedStatement preparedStatement = this.renewLock;
final long now = System.currentTimeMillis() + timeDifference; final long now = dbCurrentTimeMillis();
final Timestamp timestamp = new Timestamp(now + expirationMillis); final Timestamp expirationTime = new Timestamp(now + expirationMillis);
preparedStatement.setTimestamp(1, timestamp); if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s is renewing lock with expirationTime = %s",
lockName, holderId, expirationTime);
}
preparedStatement.setTimestamp(1, expirationTime);
preparedStatement.setString(2, holderId); preparedStatement.setString(2, holderId);
result = preparedStatement.executeUpdate() == 1; preparedStatement.setTimestamp(3, expirationTime);
preparedStatement.setTimestamp(4, expirationTime);
final int updatedRows = preparedStatement.executeUpdate();
final boolean renewed = updatedRows == 1;
connection.commit();
if (!renewed) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s has failed to renew lock: lock status = { %s }",
lockName, holderId, readableLockStatus());
}
} else {
LOGGER.debugf("[%s] %s has renewed lock", lockName, holderId);
}
return renewed;
} catch (SQLException ie) { } catch (SQLException ie) {
connection.rollback(); connection.rollback();
connection.setAutoCommit(true);
throw new IllegalStateException(ie); throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
} }
connection.commit();
connection.setAutoCommit(true);
return result;
} catch (SQLException e) { } catch (SQLException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
@ -115,30 +175,36 @@ final class JdbcLeaseLock implements LeaseLock {
public boolean tryAcquire() { public boolean tryAcquire() {
synchronized (connection) { synchronized (connection) {
try { try {
final boolean acquired; connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false); connection.setAutoCommit(false);
try { try {
final long timeDifference = timeDifference();
final PreparedStatement preparedStatement = tryAcquireLock; final PreparedStatement preparedStatement = tryAcquireLock;
final long now = System.currentTimeMillis() + timeDifference; final long now = dbCurrentTimeMillis();
preparedStatement.setString(1, holderId); preparedStatement.setString(1, holderId);
final Timestamp timestamp = new Timestamp(now + expirationMillis); final Timestamp expirationTime = new Timestamp(now + expirationMillis);
preparedStatement.setTimestamp(2, timestamp); preparedStatement.setTimestamp(2, expirationTime);
acquired = preparedStatement.executeUpdate() == 1; preparedStatement.setTimestamp(3, expirationTime);
LOGGER.debugf("[%s] %s is trying to acquire lock with expirationTime %s",
lockName, holderId, expirationTime);
final boolean acquired = preparedStatement.executeUpdate() == 1;
connection.commit();
if (acquired) {
this.maybeAcquired = true;
LOGGER.debugf("[%s] %s has acquired lock", lockName, holderId);
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s has failed to acquire lock: lock status = { %s }",
lockName, holderId, readableLockStatus());
}
}
return acquired;
} catch (SQLException ie) { } catch (SQLException ie) {
connection.rollback(); connection.rollback();
connection.setAutoCommit(true);
throw new IllegalStateException(ie); throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
} }
connection.commit();
connection.setAutoCommit(true);
if (acquired) {
this.maybeAcquired = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(holderId + " has acquired a lock");
}
}
return acquired;
} catch (SQLException e) { } catch (SQLException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
@ -158,10 +224,11 @@ final class JdbcLeaseLock implements LeaseLock {
private boolean checkValidHolderId(Predicate<? super String> holderIdFilter) { private boolean checkValidHolderId(Predicate<? super String> holderIdFilter) {
synchronized (connection) { synchronized (connection) {
try { try {
boolean result; connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false); connection.setAutoCommit(false);
try { try {
final long timeDifference = timeDifference(); boolean result;
final PreparedStatement preparedStatement = this.isLocked; final PreparedStatement preparedStatement = this.isLocked;
try (ResultSet resultSet = preparedStatement.executeQuery()) { try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) { if (!resultSet.next()) {
@ -169,29 +236,33 @@ final class JdbcLeaseLock implements LeaseLock {
} else { } else {
final String currentHolderId = resultSet.getString(1); final String currentHolderId = resultSet.getString(1);
result = holderIdFilter.test(currentHolderId); result = holderIdFilter.test(currentHolderId);
//warn about any zombie lock final Timestamp expirationTime = resultSet.getTimestamp(2);
final Timestamp timestamp = resultSet.getTimestamp(2); final Timestamp currentTimestamp = resultSet.getTimestamp(3);
if (timestamp != null) { final long currentTimestampMillis = currentTimestamp.getTime();
final long lockExpirationTime = timestamp.getTime(); boolean zombie = false;
final long now = System.currentTimeMillis() + timeDifference; if (expirationTime != null) {
final long expiredBy = now - lockExpirationTime; final long lockExpirationTime = expirationTime.getTime();
final long expiredBy = currentTimestampMillis - lockExpirationTime;
if (expiredBy > 0) { if (expiredBy > 0) {
result = false; result = false;
if (LOGGER.isDebugEnabled()) { zombie = true;
LOGGER.debug("found zombie lock with holderId: " + currentHolderId + " expired by: " + expiredBy + " ms");
}
} }
} }
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s has found %s with holderId = %s expirationTime = %s currentTimestamp = %s",
lockName, holderId, zombie ? "zombie lock" : "lock",
currentHolderId, expirationTime, currentTimestamp);
}
} }
} }
connection.commit();
return result;
} catch (SQLException ie) { } catch (SQLException ie) {
connection.rollback(); connection.rollback();
connection.setAutoCommit(true);
throw new IllegalStateException(ie); throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
} }
connection.commit();
connection.setAutoCommit(true);
return result;
} catch (SQLException e) { } catch (SQLException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
@ -202,26 +273,30 @@ final class JdbcLeaseLock implements LeaseLock {
public void release() { public void release() {
synchronized (connection) { synchronized (connection) {
try { try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false); connection.setAutoCommit(false);
try { try {
final PreparedStatement preparedStatement = this.tryReleaseLock; final PreparedStatement preparedStatement = this.tryReleaseLock;
preparedStatement.setString(1, holderId); preparedStatement.setString(1, holderId);
if (preparedStatement.executeUpdate() != 1) { final boolean released = preparedStatement.executeUpdate() == 1;
LOGGER.warn(holderId + " has failed to release a lock");
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(holderId + " has released a lock");
}
}
//consider it as released to avoid on finalize to be reclaimed //consider it as released to avoid on finalize to be reclaimed
this.maybeAcquired = false; this.maybeAcquired = false;
connection.commit();
if (!released) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s has failed to release lock: lock status = { %s }",
lockName, holderId, readableLockStatus());
}
} else {
LOGGER.debugf("[%s] %s has released lock", lockName, holderId);
}
} catch (SQLException ie) { } catch (SQLException ie) {
connection.rollback(); connection.rollback();
connection.setAutoCommit(true);
throw new IllegalStateException(ie); throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
} }
connection.commit();
connection.setAutoCommit(true);
} catch (SQLException e) { } catch (SQLException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
@ -242,6 +317,7 @@ final class JdbcLeaseLock implements LeaseLock {
this.tryAcquireLock.close(); this.tryAcquireLock.close();
this.renewLock.close(); this.renewLock.close();
this.isLocked.close(); this.isLocked.close();
this.currentDateTime.close();
} }
} }
} }

View File

@ -41,7 +41,7 @@ import org.jboss.logging.Logger;
*/ */
public final class JdbcNodeManager extends NodeManager { public final class JdbcNodeManager extends NodeManager {
private static final Logger logger = Logger.getLogger(JdbcNodeManager.class); private static final Logger LOGGER = Logger.getLogger(JdbcNodeManager.class);
private static final long MAX_PAUSE_MILLIS = 2000L; private static final long MAX_PAUSE_MILLIS = 2000L;
private final Supplier<? extends SharedStateManager> sharedStateManagerFactory; private final Supplier<? extends SharedStateManager> sharedStateManagerFactory;
@ -50,7 +50,6 @@ public final class JdbcNodeManager extends NodeManager {
private SharedStateManager sharedStateManager; private SharedStateManager sharedStateManager;
private ScheduledLeaseLock scheduledLiveLock; private ScheduledLeaseLock scheduledLiveLock;
private ScheduledLeaseLock scheduledBackupLock; private ScheduledLeaseLock scheduledBackupLock;
private final long lockRenewPeriodMillis;
private final long lockAcquisitionTimeoutMillis; private final long lockAcquisitionTimeoutMillis;
private volatile boolean interrupted = false; private volatile boolean interrupted = false;
private final LeaseLock.Pauser pauser; private final LeaseLock.Pauser pauser;
@ -74,7 +73,6 @@ public final class JdbcNodeManager extends NodeManager {
configuration.getJdbcLockExpirationMillis(), configuration.getJdbcLockExpirationMillis(),
configuration.getJdbcLockRenewPeriodMillis(), configuration.getJdbcLockRenewPeriodMillis(),
configuration.getJdbcLockAcquisitionTimeoutMillis(), configuration.getJdbcLockAcquisitionTimeoutMillis(),
configuration.getJdbcMaxAllowedMillisFromDbTime(),
configuration.getDataSource(), configuration.getDataSource(),
sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER), sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER),
scheduledExecutorService, scheduledExecutorService,
@ -88,7 +86,6 @@ public final class JdbcNodeManager extends NodeManager {
configuration.getJdbcLockExpirationMillis(), configuration.getJdbcLockExpirationMillis(),
configuration.getJdbcLockRenewPeriodMillis(), configuration.getJdbcLockRenewPeriodMillis(),
configuration.getJdbcLockAcquisitionTimeoutMillis(), configuration.getJdbcLockAcquisitionTimeoutMillis(),
configuration.getJdbcMaxAllowedMillisFromDbTime(),
configuration.getJdbcConnectionUrl(), configuration.getJdbcConnectionUrl(),
configuration.getJdbcDriverClassName(), configuration.getJdbcDriverClassName(),
sqlProvider, sqlProvider,
@ -103,7 +100,6 @@ public final class JdbcNodeManager extends NodeManager {
long lockExpirationMillis, long lockExpirationMillis,
long lockRenewPeriodMillis, long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis, long lockAcquisitionTimeoutMillis,
long maxAllowedMillisFromDbTime,
DataSource dataSource, DataSource dataSource,
SQLProvider provider, SQLProvider provider,
ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService,
@ -114,10 +110,8 @@ public final class JdbcNodeManager extends NodeManager {
networkTimeoutMillis, networkTimeoutMillis,
executorFactory == null ? null : executorFactory.getExecutor(), executorFactory == null ? null : executorFactory.getExecutor(),
lockExpirationMillis, lockExpirationMillis,
maxAllowedMillisFromDbTime,
dataSource, dataSource,
provider), provider),
false,
lockRenewPeriodMillis, lockRenewPeriodMillis,
lockAcquisitionTimeoutMillis, lockAcquisitionTimeoutMillis,
scheduledExecutorService, scheduledExecutorService,
@ -130,7 +124,6 @@ public final class JdbcNodeManager extends NodeManager {
long lockExpirationMillis, long lockExpirationMillis,
long lockRenewPeriodMillis, long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis, long lockAcquisitionTimeoutMillis,
long maxAllowedMillisFromDbTime,
String jdbcUrl, String jdbcUrl,
String driverClass, String driverClass,
SQLProvider provider, SQLProvider provider,
@ -142,11 +135,9 @@ public final class JdbcNodeManager extends NodeManager {
networkTimeoutMillis, networkTimeoutMillis,
executorFactory == null ? null : executorFactory.getExecutor(), executorFactory == null ? null : executorFactory.getExecutor(),
lockExpirationMillis, lockExpirationMillis,
maxAllowedMillisFromDbTime,
jdbcUrl, jdbcUrl,
driverClass, driverClass,
provider), provider),
false,
lockRenewPeriodMillis, lockRenewPeriodMillis,
lockAcquisitionTimeoutMillis, lockAcquisitionTimeoutMillis,
scheduledExecutorService, scheduledExecutorService,
@ -169,24 +160,22 @@ public final class JdbcNodeManager extends NodeManager {
final int networkTimeout = configuration.getJdbcNetworkTimeout(); final int networkTimeout = configuration.getJdbcNetworkTimeout();
if (networkTimeout >= 0) { if (networkTimeout >= 0) {
if (networkTimeout > lockExpiration) { if (networkTimeout > lockExpiration) {
logger.warn("jdbc-network-timeout isn't properly configured: the recommended value is <= jdbc-lock-expiration"); LOGGER.warn("jdbc-network-timeout isn't properly configured: the recommended value is <= jdbc-lock-expiration");
} }
} else { } else {
logger.warn("jdbc-network-timeout isn't properly configured: the recommended value is <= jdbc-lock-expiration"); LOGGER.warn("jdbc-network-timeout isn't properly configured: the recommended value is <= jdbc-lock-expiration");
} }
} }
private JdbcNodeManager(Supplier<? extends SharedStateManager> sharedStateManagerFactory, private JdbcNodeManager(Supplier<? extends SharedStateManager> sharedStateManagerFactory,
boolean replicatedBackup,
long lockRenewPeriodMillis, long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis, long lockAcquisitionTimeoutMillis,
ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory, ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) { IOCriticalErrorListener ioCriticalErrorListener) {
super(replicatedBackup, null); super(false, null);
this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis; this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis;
this.lockRenewPeriodMillis = lockRenewPeriodMillis; this.pauser = LeaseLock.Pauser.sleep(Math.min(lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS);
this.pauser = LeaseLock.Pauser.sleep(Math.min(this.lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS);
this.sharedStateManagerFactory = sharedStateManagerFactory; this.sharedStateManagerFactory = sharedStateManagerFactory;
this.scheduledLiveLockFactory = () -> ScheduledLeaseLock.of( this.scheduledLiveLockFactory = () -> ScheduledLeaseLock.of(
scheduledExecutorService, scheduledExecutorService,
@ -217,10 +206,9 @@ public final class JdbcNodeManager extends NodeManager {
return; return;
} }
this.sharedStateManager = sharedStateManagerFactory.get(); this.sharedStateManager = sharedStateManagerFactory.get();
if (!replicatedBackup) { LOGGER.debug("setup sharedStateManager on start");
final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID); final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
setUUID(nodeId); setUUID(nodeId);
}
this.scheduledLiveLock = scheduledLiveLockFactory.get(); this.scheduledLiveLock = scheduledLiveLockFactory.get();
this.scheduledBackupLock = scheduledBackupLockFactory.get(); this.scheduledBackupLock = scheduledBackupLockFactory.get();
super.start(); super.start();
@ -259,35 +247,62 @@ public final class JdbcNodeManager extends NodeManager {
@Override @Override
public boolean isAwaitingFailback() throws Exception { public boolean isAwaitingFailback() throws Exception {
return readSharedState() == SharedStateManager.State.FAILING_BACK; LOGGER.debug("ENTER isAwaitingFailback");
try {
return readSharedState() == SharedStateManager.State.FAILING_BACK;
} finally {
LOGGER.debug("EXIT isAwaitingFailback");
}
} }
@Override @Override
public boolean isBackupLive() throws Exception { public boolean isBackupLive() throws Exception {
//is anyone holding the live lock? LOGGER.debug("ENTER isBackupLive");
return this.scheduledLiveLock.lock().isHeld(); try {
//is anyone holding the live lock?
return this.scheduledLiveLock.lock().isHeld();
} finally {
LOGGER.debug("EXIT isBackupLive");
}
} }
@Override @Override
public void stopBackup() throws Exception { public void stopBackup() throws Exception {
if (replicatedBackup) { LOGGER.debug("ENTER stopBackup");
final UUID nodeId = getUUID(); try {
sharedStateManager.writeNodeId(nodeId); if (this.scheduledBackupLock.isStarted()) {
LOGGER.debug("scheduledBackupLock is running: stop it and release backup lock");
this.scheduledBackupLock.stop();
this.scheduledBackupLock.lock().release();
} else {
LOGGER.debug("scheduledBackupLock is not running");
}
} finally {
LOGGER.debug("EXIT stopBackup");
} }
releaseBackup();
} }
@Override @Override
public void interrupt() { public void interrupt() {
LOGGER.debug("ENTER interrupted");
//need to be volatile: must be called concurrently to work as expected //need to be volatile: must be called concurrently to work as expected
interrupted = true; interrupted = true;
LOGGER.debug("EXIT interrupted");
} }
@Override @Override
public void releaseBackup() throws Exception { public void releaseBackup() throws Exception {
if (this.scheduledBackupLock.lock().isHeldByCaller()) { LOGGER.debug("ENTER releaseBackup");
this.scheduledBackupLock.stop(); try {
this.scheduledBackupLock.lock().release(); if (this.scheduledBackupLock.isStarted()) {
LOGGER.debug("scheduledBackupLock is running: stop it and release backup lock");
this.scheduledBackupLock.stop();
this.scheduledBackupLock.lock().release();
} else {
LOGGER.debug("scheduledBackupLock is not running");
}
} finally {
LOGGER.debug("EXIT releaseBackup");
} }
} }
@ -322,11 +337,8 @@ public final class JdbcNodeManager extends NodeManager {
if (acquiredMillis > this.scheduledLiveLock.renewPeriodMillis()) { if (acquiredMillis > this.scheduledLiveLock.renewPeriodMillis()) {
if (!this.scheduledLiveLock.lock().renew()) { if (!this.scheduledLiveLock.lock().renew()) {
final IllegalStateException e = new IllegalStateException("live lock can't be renewed"); final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
try { ioCriticalErrorListener.onIOException(e, "live lock can't be renewed", null);
ioCriticalErrorListener.onIOException(e, "live lock can't be renewed", null); throw e;
} finally {
throw e;
}
} }
} }
} }
@ -343,7 +355,7 @@ public final class JdbcNodeManager extends NodeManager {
try { try {
stateWhileLocked = readSharedState(); stateWhileLocked = readSharedState();
} catch (Throwable t) { } catch (Throwable t) {
logger.error("error while holding the live node lock and tried to read the shared state", t); LOGGER.error("error while holding the live node lock and tried to read the shared state", t);
this.scheduledLiveLock.lock().release(); this.scheduledLiveLock.lock().release();
throw t; throw t;
} }
@ -351,9 +363,7 @@ public final class JdbcNodeManager extends NodeManager {
renewLiveLockIfNeeded(acquiredOn); renewLiveLockIfNeeded(acquiredOn);
liveWhileLocked = true; liveWhileLocked = true;
} else { } else {
if (logger.isDebugEnabled()) { LOGGER.debugf("state is %s while holding the live lock: releasing live lock", stateWhileLocked);
logger.debug("state is " + stateWhileLocked + " while holding the live lock");
}
//state is not live: can (try to) release the lock //state is not live: can (try to) release the lock
this.scheduledLiveLock.lock().release(); this.scheduledLiveLock.lock().release();
} }
@ -362,98 +372,133 @@ public final class JdbcNodeManager extends NodeManager {
@Override @Override
public void awaitLiveNode() throws Exception { public void awaitLiveNode() throws Exception {
boolean liveWhileLocked = false; LOGGER.debug("ENTER awaitLiveNode");
while (!liveWhileLocked) { try {
//check first without holding any lock boolean liveWhileLocked = false;
final SharedStateManager.State state = readSharedState(); while (!liveWhileLocked) {
if (state == SharedStateManager.State.LIVE) { //check first without holding any lock
//verify if the state is live while holding the live node lock too final SharedStateManager.State state = readSharedState();
liveWhileLocked = lockLiveAndCheckLiveState(); if (state == SharedStateManager.State.LIVE) {
} else { //verify if the state is live while holding the live node lock too
if (logger.isDebugEnabled()) { liveWhileLocked = lockLiveAndCheckLiveState();
logger.debug("awaiting live node...state: " + state); } else {
LOGGER.debugf("state while awaiting live node: %s", state);
}
if (!liveWhileLocked) {
checkInterrupted(() -> "awaitLiveNode got interrupted!");
pauser.idle();
} }
} }
if (!liveWhileLocked) { //state is LIVE and live lock is acquired and valid
checkInterrupted(() -> "awaitLiveNode got interrupted!"); LOGGER.debugf("acquired live node lock while state is %s: starting scheduledLiveLock", SharedStateManager.State.LIVE);
pauser.idle(); this.scheduledLiveLock.start();
} } finally {
LOGGER.debug("EXIT awaitLiveNode");
} }
//state is LIVE and live lock is acquired and valid
logger.debug("acquired live node lock");
this.scheduledLiveLock.start();
} }
@Override @Override
public void startBackup() throws Exception { public void startBackup() throws Exception {
assert !replicatedBackup; // should not be called if this is a replicating backup LOGGER.debug("ENTER startBackup");
ActiveMQServerLogger.LOGGER.waitingToBecomeBackup(); try {
ActiveMQServerLogger.LOGGER.waitingToBecomeBackup();
lock(scheduledBackupLock.lock()); lock(scheduledBackupLock.lock());
scheduledBackupLock.start(); scheduledBackupLock.start();
ActiveMQServerLogger.LOGGER.gotBackupLock(); ActiveMQServerLogger.LOGGER.gotBackupLock();
if (getUUID() == null) if (getUUID() == null)
readNodeId(); readNodeId();
} finally {
LOGGER.debug("EXIT startBackup");
}
} }
@Override @Override
public ActivateCallback startLiveNode() throws Exception { public ActivateCallback startLiveNode() throws Exception {
setFailingBack(); LOGGER.debug("ENTER startLiveNode");
try {
setFailingBack();
final String timeoutMessage = lockAcquisitionTimeoutMillis == -1 ? "indefinitely" : lockAcquisitionTimeoutMillis + " milliseconds"; final String timeoutMessage = lockAcquisitionTimeoutMillis == -1 ? "indefinitely" : lockAcquisitionTimeoutMillis + " milliseconds";
ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage); ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage);
lock(this.scheduledLiveLock.lock()); lock(this.scheduledLiveLock.lock());
this.scheduledLiveLock.start(); this.scheduledLiveLock.start();
ActiveMQServerLogger.LOGGER.obtainedLiveLock(); ActiveMQServerLogger.LOGGER.obtainedLiveLock();
return new ActivateCallback() { return new ActivateCallback() {
@Override @Override
public void activationComplete() { public void activationComplete() {
try { LOGGER.debug("ENTER activationComplete");
//state can be written only if the live renew task is running try {
setLive(); //state can be written only if the live renew task is running
} catch (Exception e) { setLive();
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); } catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
} finally {
LOGGER.debug("EXIT activationComplete");
}
} }
} };
}; } finally {
LOGGER.debug("EXIT startLiveNode");
}
} }
@Override @Override
public void pauseLiveServer() throws Exception { public void pauseLiveServer() throws Exception {
if (scheduledLiveLock.isStarted()) { LOGGER.debug("ENTER pauseLiveServer");
setPaused(); try {
scheduledLiveLock.stop(); if (scheduledLiveLock.isStarted()) {
scheduledLiveLock.lock().release(); LOGGER.debug("scheduledLiveLock is running: set paused shared state, stop it and release live lock");
} else if (scheduledLiveLock.lock().renew()) { setPaused();
setPaused(); scheduledLiveLock.stop();
scheduledLiveLock.lock().release(); scheduledLiveLock.lock().release();
} else { } else {
final IllegalStateException e = new IllegalStateException("live lock can't be renewed"); LOGGER.debug("scheduledLiveLock is not running: try renew live lock");
try { if (scheduledLiveLock.lock().renew()) {
ioCriticalErrorListener.onIOException(e, "live lock can't be renewed on pauseLiveServer", null); LOGGER.debug("live lock renewed: set paused shared state and release live lock");
} finally { setPaused();
throw e; scheduledLiveLock.lock().release();
} else {
final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
ioCriticalErrorListener.onIOException(e, "live lock can't be renewed on pauseLiveServer", null);
throw e;
}
} }
} finally {
LOGGER.debug("EXIT pauseLiveServer");
} }
} }
@Override @Override
public void crashLiveServer() throws Exception { public void crashLiveServer() throws Exception {
if (this.scheduledLiveLock.lock().isHeldByCaller()) { LOGGER.debug("ENTER crashLiveServer");
scheduledLiveLock.stop(); try {
this.scheduledLiveLock.lock().release(); if (this.scheduledLiveLock.isStarted()) {
LOGGER.debug("scheduledLiveLock is running: request stop it and release live lock");
this.scheduledLiveLock.stop();
this.scheduledLiveLock.lock().release();
} else {
LOGGER.debug("scheduledLiveLock is not running");
}
} finally {
LOGGER.debug("EXIT crashLiveServer");
} }
} }
@Override @Override
public void awaitLiveStatus() { public void awaitLiveStatus() {
while (readSharedState() != SharedStateManager.State.LIVE) { LOGGER.debug("ENTER awaitLiveStatus");
pauser.idle(); try {
while (readSharedState() != SharedStateManager.State.LIVE) {
pauser.idle();
}
} finally {
LOGGER.debug("EXIT awaitLiveStatus");
} }
} }
@ -470,17 +515,20 @@ public final class JdbcNodeManager extends NodeManager {
} }
private void writeSharedState(SharedStateManager.State state) { private void writeSharedState(SharedStateManager.State state) {
assert !this.replicatedBackup : "the replicated backup can't write the shared state!"; LOGGER.debugf("writeSharedState state = %s", state);
this.sharedStateManager.writeState(state); this.sharedStateManager.writeState(state);
} }
private SharedStateManager.State readSharedState() { private SharedStateManager.State readSharedState() {
return this.sharedStateManager.readState(); final SharedStateManager.State state = this.sharedStateManager.readState();
LOGGER.debugf("readSharedState state = %s", state);
return state;
} }
@Override @Override
public SimpleString readNodeId() { public SimpleString readNodeId() {
final UUID nodeId = this.sharedStateManager.readNodeId(); final UUID nodeId = this.sharedStateManager.readNodeId();
LOGGER.debugf("readNodeId nodeId = %s", nodeId);
setUUID(nodeId); setUUID(nodeId);
return getNodeId(); return getNodeId();
} }

View File

@ -22,8 +22,6 @@ import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -39,10 +37,9 @@ import org.jboss.logging.Logger;
final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedStateManager { final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedStateManager {
private static final Logger logger = Logger.getLogger(JdbcSharedStateManager.class); private static final Logger logger = Logger.getLogger(JdbcSharedStateManager.class);
public static final int MAX_SETUP_ATTEMPTS = 20; private static final int MAX_SETUP_ATTEMPTS = 20;
private final String holderId; private final String holderId;
private final long lockExpirationMillis; private final long lockExpirationMillis;
private final long maxAllowedMillisFromDbTime;
private JdbcLeaseLock liveLock; private JdbcLeaseLock liveLock;
private JdbcLeaseLock backupLock; private JdbcLeaseLock backupLock;
private PreparedStatement readNodeId; private PreparedStatement readNodeId;
@ -50,16 +47,14 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
private PreparedStatement initializeNodeId; private PreparedStatement initializeNodeId;
private PreparedStatement readState; private PreparedStatement readState;
private PreparedStatement writeState; private PreparedStatement writeState;
private long timeDifferenceMillisFromDb = 0;
public static JdbcSharedStateManager usingDataSource(String holderId, public static JdbcSharedStateManager usingDataSource(String holderId,
int networkTimeout, int networkTimeout,
Executor networkTimeoutExecutor, Executor networkTimeoutExecutor,
long locksExpirationMillis, long locksExpirationMillis,
long maxAllowedMillisFromDbTime,
DataSource dataSource, DataSource dataSource,
SQLProvider provider) { SQLProvider provider) {
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis, maxAllowedMillisFromDbTime); final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
sharedStateManager.setNetworkTimeout(networkTimeoutExecutor, networkTimeout); sharedStateManager.setNetworkTimeout(networkTimeoutExecutor, networkTimeout);
sharedStateManager.setDataSource(dataSource); sharedStateManager.setDataSource(dataSource);
sharedStateManager.setSqlProvider(provider); sharedStateManager.setSqlProvider(provider);
@ -73,7 +68,6 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
public static JdbcSharedStateManager usingConnectionUrl(String holderId, public static JdbcSharedStateManager usingConnectionUrl(String holderId,
long locksExpirationMillis, long locksExpirationMillis,
long maxAllowedMillisFromDbTime,
String jdbcConnectionUrl, String jdbcConnectionUrl,
String jdbcDriverClass, String jdbcDriverClass,
SQLProvider provider) { SQLProvider provider) {
@ -81,7 +75,6 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
-1, -1,
null, null,
locksExpirationMillis, locksExpirationMillis,
maxAllowedMillisFromDbTime,
jdbcConnectionUrl, jdbcConnectionUrl,
jdbcDriverClass, jdbcDriverClass,
provider); provider);
@ -91,11 +84,10 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
int networkTimeout, int networkTimeout,
Executor networkTimeoutExecutor, Executor networkTimeoutExecutor,
long locksExpirationMillis, long locksExpirationMillis,
long maxAllowedMillisFromDbTime,
String jdbcConnectionUrl, String jdbcConnectionUrl,
String jdbcDriverClass, String jdbcDriverClass,
SQLProvider provider) { SQLProvider provider) {
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis, maxAllowedMillisFromDbTime); final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
sharedStateManager.setNetworkTimeout(networkTimeoutExecutor, networkTimeout); sharedStateManager.setNetworkTimeout(networkTimeoutExecutor, networkTimeout);
sharedStateManager.setJdbcConnectionUrl(jdbcConnectionUrl); sharedStateManager.setJdbcConnectionUrl(jdbcConnectionUrl);
sharedStateManager.setJdbcDriverClass(jdbcDriverClass); sharedStateManager.setJdbcDriverClass(jdbcDriverClass);
@ -109,63 +101,33 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
} }
@Override @Override
protected void createSchema() throws SQLException { protected void createSchema() {
try { try {
createTable(sqlProvider.createNodeManagerStoreTableSQL(), sqlProvider.createNodeIdSQL(), sqlProvider.createStateSQL(), sqlProvider.createLiveLockSQL(), sqlProvider.createBackupLockSQL()); createTable(sqlProvider.createNodeManagerStoreTableSQL(), sqlProvider.createNodeIdSQL(), sqlProvider.createStateSQL(), sqlProvider.createLiveLockSQL(), sqlProvider.createBackupLockSQL());
} catch (SQLException e) { } catch (SQLException e) {
//no op: if a table already exists is not a problem in this case, the prepareStatements() call will fail right after it if the table is not correctly initialized //no op: if a table already exists is not a problem in this case, the prepareStatements() call will fail right after it if the table is not correctly initialized
if (logger.isDebugEnabled()) { logger.debug("Error while creating the schema of the JDBC shared state manager", e);
logger.debug("Error while creating the schema of the JDBC shared state manager", e);
}
}
}
/**
* It computes the distance in milliseconds of {@link System#currentTimeMillis()} from the DBMS time.<br>
* It must be added to {@link System#currentTimeMillis()} in order to approximate the DBMS time.
* It will create a transaction by its own.
*/
static long timeDifferenceMillisFromDb(Connection connection, SQLProvider sqlProvider) throws SQLException {
try (Statement statement = connection.createStatement()) {
connection.setAutoCommit(false);
final long result;
try (ResultSet resultSet = statement.executeQuery(sqlProvider.currentTimestampSQL())) {
resultSet.next();
final Timestamp timestamp = resultSet.getTimestamp(1);
final long systemNow = System.currentTimeMillis();
result = timestamp.getTime() - systemNow;
} catch (SQLException ie) {
connection.rollback();
connection.setAutoCommit(true);
throw ie;
}
connection.commit();
connection.setAutoCommit(true);
return result;
} }
} }
static JdbcLeaseLock createLiveLock(String holderId, static JdbcLeaseLock createLiveLock(String holderId,
Connection connection, Connection connection,
SQLProvider sqlProvider, SQLProvider sqlProvider,
long expirationMillis, long expirationMillis) throws SQLException {
long timeDifferenceMillisFromDb) throws SQLException { return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, "LIVE");
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), expirationMillis, timeDifferenceMillisFromDb);
} }
static JdbcLeaseLock createBackupLock(String holderId, static JdbcLeaseLock createBackupLock(String holderId,
Connection connection, Connection connection,
SQLProvider sqlProvider, SQLProvider sqlProvider,
long expirationMillis, long expirationMillis) throws SQLException {
long timeDifferenceMillisFromDb) throws SQLException { return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, "BACKUP");
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), expirationMillis, timeDifferenceMillisFromDb);
} }
@Override @Override
protected void prepareStatements() throws SQLException { protected void prepareStatements() throws SQLException {
final long timeDifferenceMillisFromDb = validateTimeDifferenceMillisFromDb(); this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis);
this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, timeDifferenceMillisFromDb); this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis);
this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, timeDifferenceMillisFromDb);
this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL()); this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL());
this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL()); this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL());
this.initializeNodeId = connection.prepareStatement(sqlProvider.initializeNodeIdSQL()); this.initializeNodeId = connection.prepareStatement(sqlProvider.initializeNodeIdSQL());
@ -173,32 +135,9 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
this.readState = connection.prepareStatement(sqlProvider.readStateSQL()); this.readState = connection.prepareStatement(sqlProvider.readStateSQL());
} }
/** private JdbcSharedStateManager(String holderId, long lockExpirationMillis) {
* It will be populated only after a {@link #start()}.
*/
long timeDifferenceMillisFromDb() {
return timeDifferenceMillisFromDb;
}
private long validateTimeDifferenceMillisFromDb() throws SQLException {
final long timeDifferenceMillisFromDb = timeDifferenceMillisFromDb(connection, sqlProvider);
this.timeDifferenceMillisFromDb = timeDifferenceMillisFromDb;
final long absoluteTimeDifference = Math.abs(timeDifferenceMillisFromDb);
if (absoluteTimeDifference > maxAllowedMillisFromDbTime) {
throw new IllegalStateException("The system is far " + (-timeDifferenceMillisFromDb) + " milliseconds from DB time, exceeding maxAllowedMillisFromDbTime = " + maxAllowedMillisFromDbTime);
}
if (absoluteTimeDifference > 0) {
final String msg = "The system is far " + timeDifferenceMillisFromDb + " milliseconds from DB time";
final Logger.Level logLevel = absoluteTimeDifference > lockExpirationMillis ? Logger.Level.WARN : Logger.Level.DEBUG;
logger.log(logLevel, msg);
}
return timeDifferenceMillisFromDb;
}
private JdbcSharedStateManager(String holderId, long lockExpirationMillis, long maxAllowedMillisFromDbTime) {
this.holderId = holderId; this.holderId = holderId;
this.lockExpirationMillis = lockExpirationMillis; this.lockExpirationMillis = lockExpirationMillis;
this.maxAllowedMillisFromDbTime = maxAllowedMillisFromDbTime;
} }
@Override @Override
@ -232,9 +171,13 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
synchronized (connection) { synchronized (connection) {
try { try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(true); connection.setAutoCommit(true);
final UUID nodeId = rawReadNodeId(); try {
return nodeId; return rawReadNodeId();
} finally {
connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) { } catch (SQLException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
@ -246,8 +189,13 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
synchronized (connection) { synchronized (connection) {
try { try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(true); connection.setAutoCommit(true);
rawWriteNodeId(nodeId); try {
rawWriteNodeId(nodeId);
} finally {
connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) { } catch (SQLException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
@ -258,7 +206,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
final PreparedStatement preparedStatement = this.writeNodeId; final PreparedStatement preparedStatement = this.writeNodeId;
preparedStatement.setString(1, nodeId.toString()); preparedStatement.setString(1, nodeId.toString());
if (preparedStatement.executeUpdate() != 1) { if (preparedStatement.executeUpdate() != 1) {
throw new IllegalStateException("can't write NODE_ID on the JDBC Node Manager Store!"); throw new IllegalStateException("can't write NodeId on the JDBC Node Manager Store!");
} }
} }
@ -283,9 +231,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
return nodeId; return nodeId;
} }
} catch (SQLException e) { } catch (SQLException e) {
if (logger.isDebugEnabled()) { logger.debug("Error while attempting to setup the NodeId", e);
logger.debug("Error while attempting to setup the NodeId", e);
}
lastError = e; lastError = e;
} }
} }
@ -299,36 +245,34 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
} }
private UUID initializeOrReadNodeId(final UUID newNodeId) throws SQLException { private UUID initializeOrReadNodeId(final UUID newNodeId) throws SQLException {
final UUID nodeId; synchronized (connection) {
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
connection.setAutoCommit(false); final boolean autoCommit = connection.getAutoCommit();
try { connection.setAutoCommit(false);
//optimistic try to initialize nodeId try {
if (rawInitializeNodeId(newNodeId)) { final UUID nodeId;
nodeId = newNodeId; //optimistic try to initialize nodeId
} else { if (rawInitializeNodeId(newNodeId)) {
nodeId = rawReadNodeId(); nodeId = newNodeId;
} else {
nodeId = rawReadNodeId();
}
if (nodeId != null) {
connection.commit();
return nodeId;
} else {
//rawInitializeNodeId has failed just due to contention or nodeId wasn't committed yet
connection.rollback();
logger.debugf("Rollback after failed to update NodeId to %s and haven't found any NodeId", newNodeId);
return null;
}
} catch (SQLException e) {
connection.rollback();
logger.debugf(e, "Rollback while trying to update NodeId to %s", newNodeId);
return null;
} finally {
connection.setAutoCommit(autoCommit);
} }
} catch (SQLException e) {
connection.rollback();
connection.setAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Rollback while trying to update NodeId to " + newNodeId, e);
}
return null;
}
if (nodeId != null) {
connection.commit();
connection.setAutoCommit(true);
return nodeId;
} else {
//that means that the rawInitializeNodeId has failed just due to contention or the nodeId wasn't committed yet
connection.rollback();
connection.setAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Rollback after failed to update NodeId to " + newNodeId + " and haven't found any NodeId");
}
return null;
} }
} }
@ -370,17 +314,26 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
synchronized (connection) { synchronized (connection) {
try { try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
connection.setAutoCommit(true); final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
final State state; final State state;
final PreparedStatement preparedStatement = this.readState; try {
try (ResultSet resultSet = preparedStatement.executeQuery()) { final PreparedStatement preparedStatement = this.readState;
if (!resultSet.next()) { try (ResultSet resultSet = preparedStatement.executeQuery()) {
state = State.FIRST_TIME_START; if (!resultSet.next()) {
} else { state = State.FIRST_TIME_START;
state = decodeState(resultSet.getString(1)); } else {
state = decodeState(resultSet.getString(1));
}
} }
connection.commit();
return state;
} catch (SQLException ie) {
connection.rollback();
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
} }
return state;
} catch (SQLException e) { } catch (SQLException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
@ -393,11 +346,21 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
synchronized (connection) { synchronized (connection) {
try { try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
connection.setAutoCommit(true); final boolean autoCommit = connection.getAutoCommit();
final PreparedStatement preparedStatement = this.writeState; connection.setAutoCommit(false);
preparedStatement.setString(1, encodedState); try {
if (preparedStatement.executeUpdate() != 1) { final PreparedStatement preparedStatement = this.writeState;
throw new IllegalStateException("can't write STATE to the JDBC Node Manager Store!"); preparedStatement.setString(1, encodedState);
if (preparedStatement.executeUpdate() != 1) {
throw new IllegalStateException("can't write state to the JDBC Node Manager Store!");
}
connection.commit();
} catch (SQLException ie) {
connection.rollback();
connection.setAutoCommit(true);
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
} }
} catch (SQLException e) { } catch (SQLException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
@ -408,17 +371,15 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
@Override @Override
public void stop() throws SQLException { public void stop() throws SQLException {
//release all the managed resources inside the connection lock //release all the managed resources inside the connection lock
if (sqlProvider.closeConnectionOnShutdown()) { synchronized (connection) {
synchronized (connection) { this.readNodeId.close();
this.readNodeId.close(); this.writeNodeId.close();
this.writeNodeId.close(); this.initializeNodeId.close();
this.initializeNodeId.close(); this.readState.close();
this.readState.close(); this.writeState.close();
this.writeState.close(); this.liveLock.close();
this.liveLock.close(); this.backupLock.close();
this.backupLock.close(); super.stop();
super.stop();
}
} }
} }

View File

@ -1970,13 +1970,6 @@
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="jdbc-max-allowed-millis-from-db-time" type="xsd:int" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The absolute time in milliseconds the system clock is allowed to be distant from the DB time
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all> </xsd:all>
<xsd:attributeGroup ref="xml:specialAttrs"/> <xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType> </xsd:complexType>

View File

@ -71,8 +71,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
UUID.randomUUID().toString(), UUID.randomUUID().toString(),
jdbcSharedStateManager.getConnection(), jdbcSharedStateManager.getConnection(),
sqlProvider, sqlProvider,
acquireMillis, acquireMillis);
jdbcSharedStateManager.timeDifferenceMillisFromDb());
} catch (SQLException e) { } catch (SQLException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
@ -100,7 +99,6 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
.usingConnectionUrl( .usingConnectionUrl(
UUID.randomUUID().toString(), UUID.randomUUID().toString(),
dbConf.getJdbcLockExpirationMillis(), dbConf.getJdbcLockExpirationMillis(),
dbConf.getJdbcMaxAllowedMillisFromDbTime(),
dbConf.getJdbcConnectionUrl(), dbConf.getJdbcConnectionUrl(),
dbConf.getJdbcDriverClassName(), dbConf.getJdbcDriverClassName(),
sqlProvider); sqlProvider);

View File

@ -52,7 +52,6 @@ public class JdbcSharedStateManagerTest extends ActiveMQTestBase {
return JdbcSharedStateManager.usingConnectionUrl( return JdbcSharedStateManager.usingConnectionUrl(
UUID.randomUUID().toString(), UUID.randomUUID().toString(),
dbConf.getJdbcLockExpirationMillis(), dbConf.getJdbcLockExpirationMillis(),
dbConf.getJdbcMaxAllowedMillisFromDbTime(),
dbConf.getJdbcConnectionUrl(), dbConf.getJdbcConnectionUrl(),
dbConf.getJdbcDriverClassName(), dbConf.getJdbcDriverClassName(),
sqlProvider); sqlProvider);

View File

@ -479,7 +479,6 @@ public abstract class ActiveMQTestBase extends Assert {
dbStorageConfiguration.setJdbcLockAcquisitionTimeoutMillis(getJdbcLockAcquisitionTimeoutMillis()); dbStorageConfiguration.setJdbcLockAcquisitionTimeoutMillis(getJdbcLockAcquisitionTimeoutMillis());
dbStorageConfiguration.setJdbcLockExpirationMillis(getJdbcLockExpirationMillis()); dbStorageConfiguration.setJdbcLockExpirationMillis(getJdbcLockExpirationMillis());
dbStorageConfiguration.setJdbcLockRenewPeriodMillis(getJdbcLockRenewPeriodMillis()); dbStorageConfiguration.setJdbcLockRenewPeriodMillis(getJdbcLockRenewPeriodMillis());
dbStorageConfiguration.setJdbcMaxAllowedMillisFromDbTime(getJdbcMaxAllowedMillisFromDbTime());
return dbStorageConfiguration; return dbStorageConfiguration;
} }
@ -495,10 +494,6 @@ public abstract class ActiveMQTestBase extends Assert {
return Long.getLong("jdbc.lock.renew", ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis()); return Long.getLong("jdbc.lock.renew", ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis());
} }
protected long getJdbcMaxAllowedMillisFromDbTime() {
return Long.getLong("jdbc.max.diff.db", ActiveMQDefaultConfiguration.getDefaultJdbcMaxAllowedMillisFromDbTime());
}
public void destroyTables(List<String> tableNames) throws Exception { public void destroyTables(List<String> tableNames) throws Exception {
Driver driver = getDriver(getJDBCClassName()); Driver driver = getDriver(getJDBCClassName());
Connection connection = driver.connect(getTestJDBCConnectionUrl(), null); Connection connection = driver.connect(getTestJDBCConnectionUrl(), null);

View File

@ -468,10 +468,6 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a
The time in milliseconds a JDBC lock is considered valid without keeping it alive. The default value The time in milliseconds a JDBC lock is considered valid without keeping it alive. The default value
is 20000 milliseconds (ie 20 seconds). is 20000 milliseconds (ie 20 seconds).
- `jdbc-max-allowed-millis-from-db-time`
The absolute time in milliseconds the system clock is allowed to be distant from the DB time, otherwise a critical error will be raised. The default value is 60000 milliseconds (ie 60 seconds).
Note that some DBMS (e.g. Oracle, 30 chars) have restrictions on the size of table names, this should be taken into consideration when configuring table names for the Artemis database store, pay particular attention to the page store table name, which can be appended with a unique ID of up to 20 characters. (for Oracle this would mean configuring a page-store-table-name of max size of 10 chars). Note that some DBMS (e.g. Oracle, 30 chars) have restrictions on the size of table names, this should be taken into consideration when configuring table names for the Artemis database store, pay particular attention to the page store table name, which can be appended with a unique ID of up to 20 characters. (for Oracle this would mean configuring a page-store-table-name of max size of 10 chars).
## Zero Persistence ## Zero Persistence