ARTEMIS-1784 JDBC NodeManager should just use DMBS clock
It avoid using the system clock to perform the locks logic by using the DBMS time. It contains several improvements on the JDBC error handling and an improved observability thanks to debug logs.
This commit is contained in:
parent
7fa8c55f43
commit
fdb63df392
artemis-cli/src/main
java/org/apache/activemq/artemis/cli/commands
resources/org/apache/activemq/artemis/cli/commands/etc
artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config
artemis-jdbc-store/src/main/resources
artemis-server/src
main
java/org/apache/activemq/artemis/core
config/storage
deployers/impl
server/impl/jdbc
resources/schema
test/java/org/apache/activemq/artemis
core/server/impl/jdbc
tests/util
docs/user-manual/en
|
@ -299,9 +299,6 @@ public class Create extends InputAbstract {
|
|||
@Option(name = "--jdbc-lock-expiration", description = "Lock expiration")
|
||||
long jdbcLockExpiration = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis();
|
||||
|
||||
@Option(name = "--jdbc-max-allowed-millis-from-db-time", description = "Db time allowed difference")
|
||||
long jdbcMaxAllowedMillisFromDbTime = ActiveMQDefaultConfiguration.getDefaultJdbcMaxAllowedMillisFromDbTime();
|
||||
|
||||
private boolean IS_WINDOWS;
|
||||
private boolean IS_CYGWIN;
|
||||
|
||||
|
@ -629,7 +626,6 @@ public class Create extends InputAbstract {
|
|||
filters.put("${jdbcNetworkTimeout}", "" + jdbcNetworkTimeout);
|
||||
filters.put("${jdbcLockRenewPeriod}", "" + jdbcLockRenewPeriod);
|
||||
filters.put("${jdbcLockExpiration}", "" + jdbcLockExpiration);
|
||||
filters.put("${jdbcMaxAllowedMillisFromDbTime}", "" + jdbcMaxAllowedMillisFromDbTime);
|
||||
filters.put("${jdbc}", readTextFile(ETC_DATABASE_STORE_TXT, filters));
|
||||
} else {
|
||||
filters.put("${jdbc}", "");
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
<node-manager-store-table-name>${jdbcNodeManager}</node-manager-store-table-name>
|
||||
<jdbc-lock-expiration>${jdbcLockExpiration}</jdbc-lock-expiration>
|
||||
<jdbc-lock-renew-period>${jdbcLockRenewPeriod}</jdbc-lock-renew-period>
|
||||
<jdbc-max-allowed-millis-from-db-time>${jdbcMaxAllowedMillisFromDbTime}</jdbc-max-allowed-millis-from-db-time>
|
||||
<jdbc-network-timeout>${jdbcNetworkTimeout}</jdbc-network-timeout>
|
||||
</database-store>
|
||||
</store>
|
||||
|
|
|
@ -451,8 +451,6 @@ public final class ActiveMQDefaultConfiguration {
|
|||
|
||||
private static final long DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = -1;
|
||||
|
||||
private static final long DEFAULT_JDBC_MAX_ALLOWED_MILLIS_FROM_DB_TIME = TimeUnit.SECONDS.toMillis(60);
|
||||
|
||||
// Default period to wait between connection TTL checks
|
||||
public static final long DEFAULT_CONNECTION_TTL_CHECK_INTERVAL = 2000;
|
||||
|
||||
|
@ -1258,10 +1256,6 @@ public final class ActiveMQDefaultConfiguration {
|
|||
return DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS;
|
||||
}
|
||||
|
||||
public static long getDefaultJdbcMaxAllowedMillisFromDbTime() {
|
||||
return DEFAULT_JDBC_MAX_ALLOWED_MILLIS_FROM_DB_TIME;
|
||||
}
|
||||
|
||||
public static long getDefaultConnectionTtlCheckInterval() {
|
||||
return DEFAULT_CONNECTION_TTL_CHECK_INTERVAL;
|
||||
}
|
||||
|
|
|
@ -38,10 +38,10 @@ count-journal-record=SELECT COUNT(*) FROM %s
|
|||
|
||||
create-node-manager-store-table=CREATE TABLE %s (ID INT NOT NULL, HOLDER_ID VARCHAR(128), HOLDER_EXPIRATION_TIME TIMESTAMP, NODE_ID CHAR(36),STATE CHAR(1), PRIMARY KEY(ID))
|
||||
create-state=INSERT INTO %s (ID) VALUES (%s)
|
||||
try-acquire-lock=UPDATE %s SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP) AND ID = %s
|
||||
try-acquire-lock=UPDATE %s SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR (HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP AND ? > CURRENT_TIMESTAMP)) AND ID = %s
|
||||
try-release-lock=UPDATE %s SET HOLDER_ID = NULL, HOLDER_EXPIRATION_TIME = NULL WHERE HOLDER_ID = ? AND ID = %s
|
||||
is-locked=SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME FROM %s WHERE ID = %s
|
||||
renew-lock=UPDATE %s SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND ID = %s
|
||||
is-locked=SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME, CURRENT_TIMESTAMP FROM %s WHERE ID = %s
|
||||
renew-lock=UPDATE %s SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND HOLDER_EXPIRATION_TIME IS NOT NULL AND ? > HOLDER_EXPIRATION_TIME AND ? > CURRENT_TIMESTAMP AND ID = %s
|
||||
current-timestamp=SELECT CURRENT_TIMESTAMP FROM %s
|
||||
write-state=UPDATE %s SET STATE = ? WHERE ID = %s
|
||||
read-state=SELECT STATE FROM %s WHERE ID = %s
|
||||
|
|
|
@ -50,8 +50,6 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
|
|||
|
||||
private long jdbcLockAcquisitionTimeoutMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis();
|
||||
|
||||
private long jdbcMaxAllowedMillisFromDbTime = ActiveMQDefaultConfiguration.getDefaultJdbcMaxAllowedMillisFromDbTime();
|
||||
|
||||
private long jdbcJournalSyncPeriodMillis = ActiveMQDefaultConfiguration.getDefaultJdbcJournalSyncPeriodMillis();
|
||||
|
||||
@Override
|
||||
|
@ -187,12 +185,4 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
|
|||
public void setJdbcJournalSyncPeriodMillis(long jdbcJournalSyncPeriodMillis) {
|
||||
this.jdbcJournalSyncPeriodMillis = jdbcJournalSyncPeriodMillis;
|
||||
}
|
||||
|
||||
public long getJdbcMaxAllowedMillisFromDbTime() {
|
||||
return jdbcMaxAllowedMillisFromDbTime;
|
||||
}
|
||||
|
||||
public void setJdbcMaxAllowedMillisFromDbTime(long jdbcMaxAllowedMillisFromDbTime) {
|
||||
this.jdbcMaxAllowedMillisFromDbTime = jdbcMaxAllowedMillisFromDbTime;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1473,7 +1473,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
conf.setJdbcNetworkTimeout(getInteger(storeNode, "jdbc-network-timeout", conf.getJdbcNetworkTimeout(), Validators.NO_CHECK));
|
||||
conf.setJdbcLockRenewPeriodMillis(getLong(storeNode, "jdbc-lock-renew-period", conf.getJdbcLockRenewPeriodMillis(), Validators.NO_CHECK));
|
||||
conf.setJdbcLockExpirationMillis(getLong(storeNode, "jdbc-lock-expiration", conf.getJdbcLockExpirationMillis(), Validators.NO_CHECK));
|
||||
conf.setJdbcMaxAllowedMillisFromDbTime(getLong(storeNode, "jdbc-max-allowed-millis-from-db-time", conf.getJdbcMaxAllowedMillisFromDbTime(), Validators.NO_CHECK));
|
||||
conf.setJdbcJournalSyncPeriodMillis(getLong(storeNode, "jdbc-journal-sync-period", conf.getJdbcJournalSyncPeriodMillis(), Validators.NO_CHECK));
|
||||
return conf;
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.sql.ResultSet;
|
|||
import java.sql.SQLException;
|
||||
import java.sql.Timestamp;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.jboss.logging.Logger;
|
||||
|
@ -35,14 +36,15 @@ final class JdbcLeaseLock implements LeaseLock {
|
|||
private static final Logger LOGGER = Logger.getLogger(JdbcLeaseLock.class);
|
||||
private static final int MAX_HOLDER_ID_LENGTH = 128;
|
||||
private final Connection connection;
|
||||
private long millisDiffFromDbTime;
|
||||
private final String holderId;
|
||||
private final PreparedStatement tryAcquireLock;
|
||||
private final PreparedStatement tryReleaseLock;
|
||||
private final PreparedStatement renewLock;
|
||||
private final PreparedStatement isLocked;
|
||||
private final PreparedStatement currentDateTime;
|
||||
private final long expirationMillis;
|
||||
private boolean maybeAcquired;
|
||||
private final String lockName;
|
||||
|
||||
/**
|
||||
* The lock will be responsible (ie {@link #close()}) of all the {@link PreparedStatement}s used by it, but not of the {@link Connection},
|
||||
|
@ -54,20 +56,22 @@ final class JdbcLeaseLock implements LeaseLock {
|
|||
PreparedStatement tryReleaseLock,
|
||||
PreparedStatement renewLock,
|
||||
PreparedStatement isLocked,
|
||||
PreparedStatement currentDateTime,
|
||||
long expirationMIllis,
|
||||
long millisDiffFromDbTime) {
|
||||
String lockName) {
|
||||
if (holderId.length() > MAX_HOLDER_ID_LENGTH) {
|
||||
throw new IllegalArgumentException("holderId length must be <=" + MAX_HOLDER_ID_LENGTH);
|
||||
}
|
||||
this.holderId = holderId;
|
||||
this.millisDiffFromDbTime = millisDiffFromDbTime;
|
||||
this.tryAcquireLock = tryAcquireLock;
|
||||
this.tryReleaseLock = tryReleaseLock;
|
||||
this.renewLock = renewLock;
|
||||
this.isLocked = isLocked;
|
||||
this.currentDateTime = currentDateTime;
|
||||
this.expirationMillis = expirationMIllis;
|
||||
this.maybeAcquired = false;
|
||||
this.connection = connection;
|
||||
this.lockName = lockName;
|
||||
}
|
||||
|
||||
public String holderId() {
|
||||
|
@ -79,32 +83,88 @@ final class JdbcLeaseLock implements LeaseLock {
|
|||
return expirationMillis;
|
||||
}
|
||||
|
||||
private long timeDifference() {
|
||||
return millisDiffFromDbTime;
|
||||
private String readableLockStatus() {
|
||||
try {
|
||||
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
|
||||
final boolean autoCommit = connection.getAutoCommit();
|
||||
connection.setAutoCommit(false);
|
||||
try {
|
||||
final String lockStatus;
|
||||
final PreparedStatement preparedStatement = this.isLocked;
|
||||
try (ResultSet resultSet = preparedStatement.executeQuery()) {
|
||||
if (!resultSet.next()) {
|
||||
lockStatus = null;
|
||||
} else {
|
||||
final String currentHolderId = resultSet.getString(1);
|
||||
final Timestamp expirationTime = resultSet.getTimestamp(2);
|
||||
final Timestamp currentTimestamp = resultSet.getTimestamp(3);
|
||||
lockStatus = "holderId = " + currentHolderId + " expirationTime = " + expirationTime + " currentTimestamp = " + currentTimestamp;
|
||||
}
|
||||
}
|
||||
connection.commit();
|
||||
return lockStatus;
|
||||
} catch (SQLException ie) {
|
||||
connection.rollback();
|
||||
return ie.getMessage();
|
||||
} finally {
|
||||
connection.setAutoCommit(autoCommit);
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
return e.getMessage();
|
||||
}
|
||||
}
|
||||
|
||||
private long dbCurrentTimeMillis() throws SQLException {
|
||||
final long start = System.nanoTime();
|
||||
try (ResultSet resultSet = currentDateTime.executeQuery()) {
|
||||
resultSet.next();
|
||||
final Timestamp currentTimestamp = resultSet.getTimestamp(1);
|
||||
final long elapsedTime = System.nanoTime() - start;
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debugf("[%s] %s query currentTimestamp = %s tooks %d ms",
|
||||
lockName, holderId, currentTimestamp, TimeUnit.NANOSECONDS.toMillis(elapsedTime));
|
||||
}
|
||||
return currentTimestamp.getTime();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean renew() {
|
||||
synchronized (connection) {
|
||||
try {
|
||||
final boolean result;
|
||||
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
|
||||
final boolean autoCommit = connection.getAutoCommit();
|
||||
connection.setAutoCommit(false);
|
||||
try {
|
||||
final long timeDifference = timeDifference();
|
||||
final PreparedStatement preparedStatement = this.renewLock;
|
||||
final long now = System.currentTimeMillis() + timeDifference;
|
||||
final Timestamp timestamp = new Timestamp(now + expirationMillis);
|
||||
preparedStatement.setTimestamp(1, timestamp);
|
||||
final long now = dbCurrentTimeMillis();
|
||||
final Timestamp expirationTime = new Timestamp(now + expirationMillis);
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debugf("[%s] %s is renewing lock with expirationTime = %s",
|
||||
lockName, holderId, expirationTime);
|
||||
}
|
||||
preparedStatement.setTimestamp(1, expirationTime);
|
||||
preparedStatement.setString(2, holderId);
|
||||
result = preparedStatement.executeUpdate() == 1;
|
||||
preparedStatement.setTimestamp(3, expirationTime);
|
||||
preparedStatement.setTimestamp(4, expirationTime);
|
||||
final int updatedRows = preparedStatement.executeUpdate();
|
||||
final boolean renewed = updatedRows == 1;
|
||||
connection.commit();
|
||||
if (!renewed) {
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debugf("[%s] %s has failed to renew lock: lock status = { %s }",
|
||||
lockName, holderId, readableLockStatus());
|
||||
}
|
||||
} else {
|
||||
LOGGER.debugf("[%s] %s has renewed lock", lockName, holderId);
|
||||
}
|
||||
return renewed;
|
||||
} catch (SQLException ie) {
|
||||
connection.rollback();
|
||||
connection.setAutoCommit(true);
|
||||
throw new IllegalStateException(ie);
|
||||
} finally {
|
||||
connection.setAutoCommit(autoCommit);
|
||||
}
|
||||
connection.commit();
|
||||
connection.setAutoCommit(true);
|
||||
return result;
|
||||
} catch (SQLException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
|
@ -115,30 +175,36 @@ final class JdbcLeaseLock implements LeaseLock {
|
|||
public boolean tryAcquire() {
|
||||
synchronized (connection) {
|
||||
try {
|
||||
final boolean acquired;
|
||||
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
|
||||
final boolean autoCommit = connection.getAutoCommit();
|
||||
connection.setAutoCommit(false);
|
||||
try {
|
||||
final long timeDifference = timeDifference();
|
||||
final PreparedStatement preparedStatement = tryAcquireLock;
|
||||
final long now = System.currentTimeMillis() + timeDifference;
|
||||
final long now = dbCurrentTimeMillis();
|
||||
preparedStatement.setString(1, holderId);
|
||||
final Timestamp timestamp = new Timestamp(now + expirationMillis);
|
||||
preparedStatement.setTimestamp(2, timestamp);
|
||||
acquired = preparedStatement.executeUpdate() == 1;
|
||||
final Timestamp expirationTime = new Timestamp(now + expirationMillis);
|
||||
preparedStatement.setTimestamp(2, expirationTime);
|
||||
preparedStatement.setTimestamp(3, expirationTime);
|
||||
LOGGER.debugf("[%s] %s is trying to acquire lock with expirationTime %s",
|
||||
lockName, holderId, expirationTime);
|
||||
final boolean acquired = preparedStatement.executeUpdate() == 1;
|
||||
connection.commit();
|
||||
if (acquired) {
|
||||
this.maybeAcquired = true;
|
||||
LOGGER.debugf("[%s] %s has acquired lock", lockName, holderId);
|
||||
} else {
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debugf("[%s] %s has failed to acquire lock: lock status = { %s }",
|
||||
lockName, holderId, readableLockStatus());
|
||||
}
|
||||
}
|
||||
return acquired;
|
||||
} catch (SQLException ie) {
|
||||
connection.rollback();
|
||||
connection.setAutoCommit(true);
|
||||
throw new IllegalStateException(ie);
|
||||
} finally {
|
||||
connection.setAutoCommit(autoCommit);
|
||||
}
|
||||
connection.commit();
|
||||
connection.setAutoCommit(true);
|
||||
if (acquired) {
|
||||
this.maybeAcquired = true;
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug(holderId + " has acquired a lock");
|
||||
}
|
||||
}
|
||||
return acquired;
|
||||
} catch (SQLException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
|
@ -158,10 +224,11 @@ final class JdbcLeaseLock implements LeaseLock {
|
|||
private boolean checkValidHolderId(Predicate<? super String> holderIdFilter) {
|
||||
synchronized (connection) {
|
||||
try {
|
||||
boolean result;
|
||||
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
|
||||
final boolean autoCommit = connection.getAutoCommit();
|
||||
connection.setAutoCommit(false);
|
||||
try {
|
||||
final long timeDifference = timeDifference();
|
||||
boolean result;
|
||||
final PreparedStatement preparedStatement = this.isLocked;
|
||||
try (ResultSet resultSet = preparedStatement.executeQuery()) {
|
||||
if (!resultSet.next()) {
|
||||
|
@ -169,29 +236,33 @@ final class JdbcLeaseLock implements LeaseLock {
|
|||
} else {
|
||||
final String currentHolderId = resultSet.getString(1);
|
||||
result = holderIdFilter.test(currentHolderId);
|
||||
//warn about any zombie lock
|
||||
final Timestamp timestamp = resultSet.getTimestamp(2);
|
||||
if (timestamp != null) {
|
||||
final long lockExpirationTime = timestamp.getTime();
|
||||
final long now = System.currentTimeMillis() + timeDifference;
|
||||
final long expiredBy = now - lockExpirationTime;
|
||||
final Timestamp expirationTime = resultSet.getTimestamp(2);
|
||||
final Timestamp currentTimestamp = resultSet.getTimestamp(3);
|
||||
final long currentTimestampMillis = currentTimestamp.getTime();
|
||||
boolean zombie = false;
|
||||
if (expirationTime != null) {
|
||||
final long lockExpirationTime = expirationTime.getTime();
|
||||
final long expiredBy = currentTimestampMillis - lockExpirationTime;
|
||||
if (expiredBy > 0) {
|
||||
result = false;
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("found zombie lock with holderId: " + currentHolderId + " expired by: " + expiredBy + " ms");
|
||||
}
|
||||
zombie = true;
|
||||
}
|
||||
}
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debugf("[%s] %s has found %s with holderId = %s expirationTime = %s currentTimestamp = %s",
|
||||
lockName, holderId, zombie ? "zombie lock" : "lock",
|
||||
currentHolderId, expirationTime, currentTimestamp);
|
||||
}
|
||||
}
|
||||
}
|
||||
connection.commit();
|
||||
return result;
|
||||
} catch (SQLException ie) {
|
||||
connection.rollback();
|
||||
connection.setAutoCommit(true);
|
||||
throw new IllegalStateException(ie);
|
||||
} finally {
|
||||
connection.setAutoCommit(autoCommit);
|
||||
}
|
||||
connection.commit();
|
||||
connection.setAutoCommit(true);
|
||||
return result;
|
||||
} catch (SQLException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
|
@ -202,26 +273,30 @@ final class JdbcLeaseLock implements LeaseLock {
|
|||
public void release() {
|
||||
synchronized (connection) {
|
||||
try {
|
||||
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
|
||||
final boolean autoCommit = connection.getAutoCommit();
|
||||
connection.setAutoCommit(false);
|
||||
try {
|
||||
final PreparedStatement preparedStatement = this.tryReleaseLock;
|
||||
preparedStatement.setString(1, holderId);
|
||||
if (preparedStatement.executeUpdate() != 1) {
|
||||
LOGGER.warn(holderId + " has failed to release a lock");
|
||||
} else {
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug(holderId + " has released a lock");
|
||||
}
|
||||
}
|
||||
final boolean released = preparedStatement.executeUpdate() == 1;
|
||||
//consider it as released to avoid on finalize to be reclaimed
|
||||
this.maybeAcquired = false;
|
||||
connection.commit();
|
||||
if (!released) {
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debugf("[%s] %s has failed to release lock: lock status = { %s }",
|
||||
lockName, holderId, readableLockStatus());
|
||||
}
|
||||
} else {
|
||||
LOGGER.debugf("[%s] %s has released lock", lockName, holderId);
|
||||
}
|
||||
} catch (SQLException ie) {
|
||||
connection.rollback();
|
||||
connection.setAutoCommit(true);
|
||||
throw new IllegalStateException(ie);
|
||||
} finally {
|
||||
connection.setAutoCommit(autoCommit);
|
||||
}
|
||||
connection.commit();
|
||||
connection.setAutoCommit(true);
|
||||
} catch (SQLException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
|
@ -242,6 +317,7 @@ final class JdbcLeaseLock implements LeaseLock {
|
|||
this.tryAcquireLock.close();
|
||||
this.renewLock.close();
|
||||
this.isLocked.close();
|
||||
this.currentDateTime.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,7 +41,7 @@ import org.jboss.logging.Logger;
|
|||
*/
|
||||
public final class JdbcNodeManager extends NodeManager {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(JdbcNodeManager.class);
|
||||
private static final Logger LOGGER = Logger.getLogger(JdbcNodeManager.class);
|
||||
private static final long MAX_PAUSE_MILLIS = 2000L;
|
||||
|
||||
private final Supplier<? extends SharedStateManager> sharedStateManagerFactory;
|
||||
|
@ -50,7 +50,6 @@ public final class JdbcNodeManager extends NodeManager {
|
|||
private SharedStateManager sharedStateManager;
|
||||
private ScheduledLeaseLock scheduledLiveLock;
|
||||
private ScheduledLeaseLock scheduledBackupLock;
|
||||
private final long lockRenewPeriodMillis;
|
||||
private final long lockAcquisitionTimeoutMillis;
|
||||
private volatile boolean interrupted = false;
|
||||
private final LeaseLock.Pauser pauser;
|
||||
|
@ -74,7 +73,6 @@ public final class JdbcNodeManager extends NodeManager {
|
|||
configuration.getJdbcLockExpirationMillis(),
|
||||
configuration.getJdbcLockRenewPeriodMillis(),
|
||||
configuration.getJdbcLockAcquisitionTimeoutMillis(),
|
||||
configuration.getJdbcMaxAllowedMillisFromDbTime(),
|
||||
configuration.getDataSource(),
|
||||
sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER),
|
||||
scheduledExecutorService,
|
||||
|
@ -88,7 +86,6 @@ public final class JdbcNodeManager extends NodeManager {
|
|||
configuration.getJdbcLockExpirationMillis(),
|
||||
configuration.getJdbcLockRenewPeriodMillis(),
|
||||
configuration.getJdbcLockAcquisitionTimeoutMillis(),
|
||||
configuration.getJdbcMaxAllowedMillisFromDbTime(),
|
||||
configuration.getJdbcConnectionUrl(),
|
||||
configuration.getJdbcDriverClassName(),
|
||||
sqlProvider,
|
||||
|
@ -103,7 +100,6 @@ public final class JdbcNodeManager extends NodeManager {
|
|||
long lockExpirationMillis,
|
||||
long lockRenewPeriodMillis,
|
||||
long lockAcquisitionTimeoutMillis,
|
||||
long maxAllowedMillisFromDbTime,
|
||||
DataSource dataSource,
|
||||
SQLProvider provider,
|
||||
ScheduledExecutorService scheduledExecutorService,
|
||||
|
@ -114,10 +110,8 @@ public final class JdbcNodeManager extends NodeManager {
|
|||
networkTimeoutMillis,
|
||||
executorFactory == null ? null : executorFactory.getExecutor(),
|
||||
lockExpirationMillis,
|
||||
maxAllowedMillisFromDbTime,
|
||||
dataSource,
|
||||
provider),
|
||||
false,
|
||||
lockRenewPeriodMillis,
|
||||
lockAcquisitionTimeoutMillis,
|
||||
scheduledExecutorService,
|
||||
|
@ -130,7 +124,6 @@ public final class JdbcNodeManager extends NodeManager {
|
|||
long lockExpirationMillis,
|
||||
long lockRenewPeriodMillis,
|
||||
long lockAcquisitionTimeoutMillis,
|
||||
long maxAllowedMillisFromDbTime,
|
||||
String jdbcUrl,
|
||||
String driverClass,
|
||||
SQLProvider provider,
|
||||
|
@ -142,11 +135,9 @@ public final class JdbcNodeManager extends NodeManager {
|
|||
networkTimeoutMillis,
|
||||
executorFactory == null ? null : executorFactory.getExecutor(),
|
||||
lockExpirationMillis,
|
||||
maxAllowedMillisFromDbTime,
|
||||
jdbcUrl,
|
||||
driverClass,
|
||||
provider),
|
||||
false,
|
||||
lockRenewPeriodMillis,
|
||||
lockAcquisitionTimeoutMillis,
|
||||
scheduledExecutorService,
|
||||
|
@ -169,24 +160,22 @@ public final class JdbcNodeManager extends NodeManager {
|
|||
final int networkTimeout = configuration.getJdbcNetworkTimeout();
|
||||
if (networkTimeout >= 0) {
|
||||
if (networkTimeout > lockExpiration) {
|
||||
logger.warn("jdbc-network-timeout isn't properly configured: the recommended value is <= jdbc-lock-expiration");
|
||||
LOGGER.warn("jdbc-network-timeout isn't properly configured: the recommended value is <= jdbc-lock-expiration");
|
||||
}
|
||||
} else {
|
||||
logger.warn("jdbc-network-timeout isn't properly configured: the recommended value is <= jdbc-lock-expiration");
|
||||
LOGGER.warn("jdbc-network-timeout isn't properly configured: the recommended value is <= jdbc-lock-expiration");
|
||||
}
|
||||
}
|
||||
|
||||
private JdbcNodeManager(Supplier<? extends SharedStateManager> sharedStateManagerFactory,
|
||||
boolean replicatedBackup,
|
||||
long lockRenewPeriodMillis,
|
||||
long lockAcquisitionTimeoutMillis,
|
||||
ScheduledExecutorService scheduledExecutorService,
|
||||
ExecutorFactory executorFactory,
|
||||
IOCriticalErrorListener ioCriticalErrorListener) {
|
||||
super(replicatedBackup, null);
|
||||
super(false, null);
|
||||
this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis;
|
||||
this.lockRenewPeriodMillis = lockRenewPeriodMillis;
|
||||
this.pauser = LeaseLock.Pauser.sleep(Math.min(this.lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS);
|
||||
this.pauser = LeaseLock.Pauser.sleep(Math.min(lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS);
|
||||
this.sharedStateManagerFactory = sharedStateManagerFactory;
|
||||
this.scheduledLiveLockFactory = () -> ScheduledLeaseLock.of(
|
||||
scheduledExecutorService,
|
||||
|
@ -217,10 +206,9 @@ public final class JdbcNodeManager extends NodeManager {
|
|||
return;
|
||||
}
|
||||
this.sharedStateManager = sharedStateManagerFactory.get();
|
||||
if (!replicatedBackup) {
|
||||
final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
|
||||
setUUID(nodeId);
|
||||
}
|
||||
LOGGER.debug("setup sharedStateManager on start");
|
||||
final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
|
||||
setUUID(nodeId);
|
||||
this.scheduledLiveLock = scheduledLiveLockFactory.get();
|
||||
this.scheduledBackupLock = scheduledBackupLockFactory.get();
|
||||
super.start();
|
||||
|
@ -259,35 +247,62 @@ public final class JdbcNodeManager extends NodeManager {
|
|||
|
||||
@Override
|
||||
public boolean isAwaitingFailback() throws Exception {
|
||||
return readSharedState() == SharedStateManager.State.FAILING_BACK;
|
||||
LOGGER.debug("ENTER isAwaitingFailback");
|
||||
try {
|
||||
return readSharedState() == SharedStateManager.State.FAILING_BACK;
|
||||
} finally {
|
||||
LOGGER.debug("EXIT isAwaitingFailback");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isBackupLive() throws Exception {
|
||||
//is anyone holding the live lock?
|
||||
return this.scheduledLiveLock.lock().isHeld();
|
||||
LOGGER.debug("ENTER isBackupLive");
|
||||
try {
|
||||
//is anyone holding the live lock?
|
||||
return this.scheduledLiveLock.lock().isHeld();
|
||||
} finally {
|
||||
LOGGER.debug("EXIT isBackupLive");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopBackup() throws Exception {
|
||||
if (replicatedBackup) {
|
||||
final UUID nodeId = getUUID();
|
||||
sharedStateManager.writeNodeId(nodeId);
|
||||
LOGGER.debug("ENTER stopBackup");
|
||||
try {
|
||||
if (this.scheduledBackupLock.isStarted()) {
|
||||
LOGGER.debug("scheduledBackupLock is running: stop it and release backup lock");
|
||||
this.scheduledBackupLock.stop();
|
||||
this.scheduledBackupLock.lock().release();
|
||||
} else {
|
||||
LOGGER.debug("scheduledBackupLock is not running");
|
||||
}
|
||||
} finally {
|
||||
LOGGER.debug("EXIT stopBackup");
|
||||
}
|
||||
releaseBackup();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void interrupt() {
|
||||
LOGGER.debug("ENTER interrupted");
|
||||
//need to be volatile: must be called concurrently to work as expected
|
||||
interrupted = true;
|
||||
LOGGER.debug("EXIT interrupted");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void releaseBackup() throws Exception {
|
||||
if (this.scheduledBackupLock.lock().isHeldByCaller()) {
|
||||
this.scheduledBackupLock.stop();
|
||||
this.scheduledBackupLock.lock().release();
|
||||
LOGGER.debug("ENTER releaseBackup");
|
||||
try {
|
||||
if (this.scheduledBackupLock.isStarted()) {
|
||||
LOGGER.debug("scheduledBackupLock is running: stop it and release backup lock");
|
||||
this.scheduledBackupLock.stop();
|
||||
this.scheduledBackupLock.lock().release();
|
||||
} else {
|
||||
LOGGER.debug("scheduledBackupLock is not running");
|
||||
}
|
||||
} finally {
|
||||
LOGGER.debug("EXIT releaseBackup");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -322,11 +337,8 @@ public final class JdbcNodeManager extends NodeManager {
|
|||
if (acquiredMillis > this.scheduledLiveLock.renewPeriodMillis()) {
|
||||
if (!this.scheduledLiveLock.lock().renew()) {
|
||||
final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
|
||||
try {
|
||||
ioCriticalErrorListener.onIOException(e, "live lock can't be renewed", null);
|
||||
} finally {
|
||||
throw e;
|
||||
}
|
||||
ioCriticalErrorListener.onIOException(e, "live lock can't be renewed", null);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -343,7 +355,7 @@ public final class JdbcNodeManager extends NodeManager {
|
|||
try {
|
||||
stateWhileLocked = readSharedState();
|
||||
} catch (Throwable t) {
|
||||
logger.error("error while holding the live node lock and tried to read the shared state", t);
|
||||
LOGGER.error("error while holding the live node lock and tried to read the shared state", t);
|
||||
this.scheduledLiveLock.lock().release();
|
||||
throw t;
|
||||
}
|
||||
|
@ -351,9 +363,7 @@ public final class JdbcNodeManager extends NodeManager {
|
|||
renewLiveLockIfNeeded(acquiredOn);
|
||||
liveWhileLocked = true;
|
||||
} else {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("state is " + stateWhileLocked + " while holding the live lock");
|
||||
}
|
||||
LOGGER.debugf("state is %s while holding the live lock: releasing live lock", stateWhileLocked);
|
||||
//state is not live: can (try to) release the lock
|
||||
this.scheduledLiveLock.lock().release();
|
||||
}
|
||||
|
@ -362,98 +372,133 @@ public final class JdbcNodeManager extends NodeManager {
|
|||
|
||||
@Override
|
||||
public void awaitLiveNode() throws Exception {
|
||||
boolean liveWhileLocked = false;
|
||||
while (!liveWhileLocked) {
|
||||
//check first without holding any lock
|
||||
final SharedStateManager.State state = readSharedState();
|
||||
if (state == SharedStateManager.State.LIVE) {
|
||||
//verify if the state is live while holding the live node lock too
|
||||
liveWhileLocked = lockLiveAndCheckLiveState();
|
||||
} else {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("awaiting live node...state: " + state);
|
||||
LOGGER.debug("ENTER awaitLiveNode");
|
||||
try {
|
||||
boolean liveWhileLocked = false;
|
||||
while (!liveWhileLocked) {
|
||||
//check first without holding any lock
|
||||
final SharedStateManager.State state = readSharedState();
|
||||
if (state == SharedStateManager.State.LIVE) {
|
||||
//verify if the state is live while holding the live node lock too
|
||||
liveWhileLocked = lockLiveAndCheckLiveState();
|
||||
} else {
|
||||
LOGGER.debugf("state while awaiting live node: %s", state);
|
||||
}
|
||||
if (!liveWhileLocked) {
|
||||
checkInterrupted(() -> "awaitLiveNode got interrupted!");
|
||||
pauser.idle();
|
||||
}
|
||||
}
|
||||
if (!liveWhileLocked) {
|
||||
checkInterrupted(() -> "awaitLiveNode got interrupted!");
|
||||
pauser.idle();
|
||||
}
|
||||
//state is LIVE and live lock is acquired and valid
|
||||
LOGGER.debugf("acquired live node lock while state is %s: starting scheduledLiveLock", SharedStateManager.State.LIVE);
|
||||
this.scheduledLiveLock.start();
|
||||
} finally {
|
||||
LOGGER.debug("EXIT awaitLiveNode");
|
||||
}
|
||||
//state is LIVE and live lock is acquired and valid
|
||||
logger.debug("acquired live node lock");
|
||||
this.scheduledLiveLock.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startBackup() throws Exception {
|
||||
assert !replicatedBackup; // should not be called if this is a replicating backup
|
||||
ActiveMQServerLogger.LOGGER.waitingToBecomeBackup();
|
||||
LOGGER.debug("ENTER startBackup");
|
||||
try {
|
||||
ActiveMQServerLogger.LOGGER.waitingToBecomeBackup();
|
||||
|
||||
lock(scheduledBackupLock.lock());
|
||||
scheduledBackupLock.start();
|
||||
ActiveMQServerLogger.LOGGER.gotBackupLock();
|
||||
if (getUUID() == null)
|
||||
readNodeId();
|
||||
lock(scheduledBackupLock.lock());
|
||||
scheduledBackupLock.start();
|
||||
ActiveMQServerLogger.LOGGER.gotBackupLock();
|
||||
if (getUUID() == null)
|
||||
readNodeId();
|
||||
} finally {
|
||||
LOGGER.debug("EXIT startBackup");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActivateCallback startLiveNode() throws Exception {
|
||||
setFailingBack();
|
||||
LOGGER.debug("ENTER startLiveNode");
|
||||
try {
|
||||
setFailingBack();
|
||||
|
||||
final String timeoutMessage = lockAcquisitionTimeoutMillis == -1 ? "indefinitely" : lockAcquisitionTimeoutMillis + " milliseconds";
|
||||
final String timeoutMessage = lockAcquisitionTimeoutMillis == -1 ? "indefinitely" : lockAcquisitionTimeoutMillis + " milliseconds";
|
||||
|
||||
ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage);
|
||||
ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage);
|
||||
|
||||
lock(this.scheduledLiveLock.lock());
|
||||
lock(this.scheduledLiveLock.lock());
|
||||
|
||||
this.scheduledLiveLock.start();
|
||||
this.scheduledLiveLock.start();
|
||||
|
||||
ActiveMQServerLogger.LOGGER.obtainedLiveLock();
|
||||
ActiveMQServerLogger.LOGGER.obtainedLiveLock();
|
||||
|
||||
return new ActivateCallback() {
|
||||
@Override
|
||||
public void activationComplete() {
|
||||
try {
|
||||
//state can be written only if the live renew task is running
|
||||
setLive();
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||
return new ActivateCallback() {
|
||||
@Override
|
||||
public void activationComplete() {
|
||||
LOGGER.debug("ENTER activationComplete");
|
||||
try {
|
||||
//state can be written only if the live renew task is running
|
||||
setLive();
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||
} finally {
|
||||
LOGGER.debug("EXIT activationComplete");
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
} finally {
|
||||
LOGGER.debug("EXIT startLiveNode");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pauseLiveServer() throws Exception {
|
||||
if (scheduledLiveLock.isStarted()) {
|
||||
setPaused();
|
||||
scheduledLiveLock.stop();
|
||||
scheduledLiveLock.lock().release();
|
||||
} else if (scheduledLiveLock.lock().renew()) {
|
||||
setPaused();
|
||||
scheduledLiveLock.lock().release();
|
||||
} else {
|
||||
final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
|
||||
try {
|
||||
ioCriticalErrorListener.onIOException(e, "live lock can't be renewed on pauseLiveServer", null);
|
||||
} finally {
|
||||
throw e;
|
||||
LOGGER.debug("ENTER pauseLiveServer");
|
||||
try {
|
||||
if (scheduledLiveLock.isStarted()) {
|
||||
LOGGER.debug("scheduledLiveLock is running: set paused shared state, stop it and release live lock");
|
||||
setPaused();
|
||||
scheduledLiveLock.stop();
|
||||
scheduledLiveLock.lock().release();
|
||||
} else {
|
||||
LOGGER.debug("scheduledLiveLock is not running: try renew live lock");
|
||||
if (scheduledLiveLock.lock().renew()) {
|
||||
LOGGER.debug("live lock renewed: set paused shared state and release live lock");
|
||||
setPaused();
|
||||
scheduledLiveLock.lock().release();
|
||||
} else {
|
||||
final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
|
||||
ioCriticalErrorListener.onIOException(e, "live lock can't be renewed on pauseLiveServer", null);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
LOGGER.debug("EXIT pauseLiveServer");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void crashLiveServer() throws Exception {
|
||||
if (this.scheduledLiveLock.lock().isHeldByCaller()) {
|
||||
scheduledLiveLock.stop();
|
||||
this.scheduledLiveLock.lock().release();
|
||||
LOGGER.debug("ENTER crashLiveServer");
|
||||
try {
|
||||
if (this.scheduledLiveLock.isStarted()) {
|
||||
LOGGER.debug("scheduledLiveLock is running: request stop it and release live lock");
|
||||
this.scheduledLiveLock.stop();
|
||||
this.scheduledLiveLock.lock().release();
|
||||
} else {
|
||||
LOGGER.debug("scheduledLiveLock is not running");
|
||||
}
|
||||
} finally {
|
||||
LOGGER.debug("EXIT crashLiveServer");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void awaitLiveStatus() {
|
||||
while (readSharedState() != SharedStateManager.State.LIVE) {
|
||||
pauser.idle();
|
||||
LOGGER.debug("ENTER awaitLiveStatus");
|
||||
try {
|
||||
while (readSharedState() != SharedStateManager.State.LIVE) {
|
||||
pauser.idle();
|
||||
}
|
||||
} finally {
|
||||
LOGGER.debug("EXIT awaitLiveStatus");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -470,17 +515,20 @@ public final class JdbcNodeManager extends NodeManager {
|
|||
}
|
||||
|
||||
private void writeSharedState(SharedStateManager.State state) {
|
||||
assert !this.replicatedBackup : "the replicated backup can't write the shared state!";
|
||||
LOGGER.debugf("writeSharedState state = %s", state);
|
||||
this.sharedStateManager.writeState(state);
|
||||
}
|
||||
|
||||
private SharedStateManager.State readSharedState() {
|
||||
return this.sharedStateManager.readState();
|
||||
final SharedStateManager.State state = this.sharedStateManager.readState();
|
||||
LOGGER.debugf("readSharedState state = %s", state);
|
||||
return state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleString readNodeId() {
|
||||
final UUID nodeId = this.sharedStateManager.readNodeId();
|
||||
LOGGER.debugf("readNodeId nodeId = %s", nodeId);
|
||||
setUUID(nodeId);
|
||||
return getNodeId();
|
||||
}
|
||||
|
|
|
@ -22,8 +22,6 @@ import java.sql.Connection;
|
|||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.sql.Timestamp;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
@ -39,10 +37,9 @@ import org.jboss.logging.Logger;
|
|||
final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedStateManager {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(JdbcSharedStateManager.class);
|
||||
public static final int MAX_SETUP_ATTEMPTS = 20;
|
||||
private static final int MAX_SETUP_ATTEMPTS = 20;
|
||||
private final String holderId;
|
||||
private final long lockExpirationMillis;
|
||||
private final long maxAllowedMillisFromDbTime;
|
||||
private JdbcLeaseLock liveLock;
|
||||
private JdbcLeaseLock backupLock;
|
||||
private PreparedStatement readNodeId;
|
||||
|
@ -50,16 +47,14 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
|||
private PreparedStatement initializeNodeId;
|
||||
private PreparedStatement readState;
|
||||
private PreparedStatement writeState;
|
||||
private long timeDifferenceMillisFromDb = 0;
|
||||
|
||||
public static JdbcSharedStateManager usingDataSource(String holderId,
|
||||
int networkTimeout,
|
||||
Executor networkTimeoutExecutor,
|
||||
long locksExpirationMillis,
|
||||
long maxAllowedMillisFromDbTime,
|
||||
DataSource dataSource,
|
||||
SQLProvider provider) {
|
||||
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis, maxAllowedMillisFromDbTime);
|
||||
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
|
||||
sharedStateManager.setNetworkTimeout(networkTimeoutExecutor, networkTimeout);
|
||||
sharedStateManager.setDataSource(dataSource);
|
||||
sharedStateManager.setSqlProvider(provider);
|
||||
|
@ -73,7 +68,6 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
|||
|
||||
public static JdbcSharedStateManager usingConnectionUrl(String holderId,
|
||||
long locksExpirationMillis,
|
||||
long maxAllowedMillisFromDbTime,
|
||||
String jdbcConnectionUrl,
|
||||
String jdbcDriverClass,
|
||||
SQLProvider provider) {
|
||||
|
@ -81,7 +75,6 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
|||
-1,
|
||||
null,
|
||||
locksExpirationMillis,
|
||||
maxAllowedMillisFromDbTime,
|
||||
jdbcConnectionUrl,
|
||||
jdbcDriverClass,
|
||||
provider);
|
||||
|
@ -91,11 +84,10 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
|||
int networkTimeout,
|
||||
Executor networkTimeoutExecutor,
|
||||
long locksExpirationMillis,
|
||||
long maxAllowedMillisFromDbTime,
|
||||
String jdbcConnectionUrl,
|
||||
String jdbcDriverClass,
|
||||
SQLProvider provider) {
|
||||
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis, maxAllowedMillisFromDbTime);
|
||||
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
|
||||
sharedStateManager.setNetworkTimeout(networkTimeoutExecutor, networkTimeout);
|
||||
sharedStateManager.setJdbcConnectionUrl(jdbcConnectionUrl);
|
||||
sharedStateManager.setJdbcDriverClass(jdbcDriverClass);
|
||||
|
@ -109,63 +101,33 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void createSchema() throws SQLException {
|
||||
protected void createSchema() {
|
||||
try {
|
||||
createTable(sqlProvider.createNodeManagerStoreTableSQL(), sqlProvider.createNodeIdSQL(), sqlProvider.createStateSQL(), sqlProvider.createLiveLockSQL(), sqlProvider.createBackupLockSQL());
|
||||
} catch (SQLException e) {
|
||||
//no op: if a table already exists is not a problem in this case, the prepareStatements() call will fail right after it if the table is not correctly initialized
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Error while creating the schema of the JDBC shared state manager", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* It computes the distance in milliseconds of {@link System#currentTimeMillis()} from the DBMS time.<br>
|
||||
* It must be added to {@link System#currentTimeMillis()} in order to approximate the DBMS time.
|
||||
* It will create a transaction by its own.
|
||||
*/
|
||||
static long timeDifferenceMillisFromDb(Connection connection, SQLProvider sqlProvider) throws SQLException {
|
||||
try (Statement statement = connection.createStatement()) {
|
||||
connection.setAutoCommit(false);
|
||||
final long result;
|
||||
try (ResultSet resultSet = statement.executeQuery(sqlProvider.currentTimestampSQL())) {
|
||||
resultSet.next();
|
||||
final Timestamp timestamp = resultSet.getTimestamp(1);
|
||||
final long systemNow = System.currentTimeMillis();
|
||||
result = timestamp.getTime() - systemNow;
|
||||
} catch (SQLException ie) {
|
||||
connection.rollback();
|
||||
connection.setAutoCommit(true);
|
||||
throw ie;
|
||||
}
|
||||
connection.commit();
|
||||
connection.setAutoCommit(true);
|
||||
return result;
|
||||
logger.debug("Error while creating the schema of the JDBC shared state manager", e);
|
||||
}
|
||||
}
|
||||
|
||||
static JdbcLeaseLock createLiveLock(String holderId,
|
||||
Connection connection,
|
||||
SQLProvider sqlProvider,
|
||||
long expirationMillis,
|
||||
long timeDifferenceMillisFromDb) throws SQLException {
|
||||
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), expirationMillis, timeDifferenceMillisFromDb);
|
||||
long expirationMillis) throws SQLException {
|
||||
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, "LIVE");
|
||||
}
|
||||
|
||||
static JdbcLeaseLock createBackupLock(String holderId,
|
||||
Connection connection,
|
||||
SQLProvider sqlProvider,
|
||||
long expirationMillis,
|
||||
long timeDifferenceMillisFromDb) throws SQLException {
|
||||
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), expirationMillis, timeDifferenceMillisFromDb);
|
||||
long expirationMillis) throws SQLException {
|
||||
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, "BACKUP");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void prepareStatements() throws SQLException {
|
||||
final long timeDifferenceMillisFromDb = validateTimeDifferenceMillisFromDb();
|
||||
this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, timeDifferenceMillisFromDb);
|
||||
this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, timeDifferenceMillisFromDb);
|
||||
this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis);
|
||||
this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis);
|
||||
this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL());
|
||||
this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL());
|
||||
this.initializeNodeId = connection.prepareStatement(sqlProvider.initializeNodeIdSQL());
|
||||
|
@ -173,32 +135,9 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
|||
this.readState = connection.prepareStatement(sqlProvider.readStateSQL());
|
||||
}
|
||||
|
||||
/**
|
||||
* It will be populated only after a {@link #start()}.
|
||||
*/
|
||||
long timeDifferenceMillisFromDb() {
|
||||
return timeDifferenceMillisFromDb;
|
||||
}
|
||||
|
||||
private long validateTimeDifferenceMillisFromDb() throws SQLException {
|
||||
final long timeDifferenceMillisFromDb = timeDifferenceMillisFromDb(connection, sqlProvider);
|
||||
this.timeDifferenceMillisFromDb = timeDifferenceMillisFromDb;
|
||||
final long absoluteTimeDifference = Math.abs(timeDifferenceMillisFromDb);
|
||||
if (absoluteTimeDifference > maxAllowedMillisFromDbTime) {
|
||||
throw new IllegalStateException("The system is far " + (-timeDifferenceMillisFromDb) + " milliseconds from DB time, exceeding maxAllowedMillisFromDbTime = " + maxAllowedMillisFromDbTime);
|
||||
}
|
||||
if (absoluteTimeDifference > 0) {
|
||||
final String msg = "The system is far " + timeDifferenceMillisFromDb + " milliseconds from DB time";
|
||||
final Logger.Level logLevel = absoluteTimeDifference > lockExpirationMillis ? Logger.Level.WARN : Logger.Level.DEBUG;
|
||||
logger.log(logLevel, msg);
|
||||
}
|
||||
return timeDifferenceMillisFromDb;
|
||||
}
|
||||
|
||||
private JdbcSharedStateManager(String holderId, long lockExpirationMillis, long maxAllowedMillisFromDbTime) {
|
||||
private JdbcSharedStateManager(String holderId, long lockExpirationMillis) {
|
||||
this.holderId = holderId;
|
||||
this.lockExpirationMillis = lockExpirationMillis;
|
||||
this.maxAllowedMillisFromDbTime = maxAllowedMillisFromDbTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -232,9 +171,13 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
|||
synchronized (connection) {
|
||||
try {
|
||||
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
|
||||
final boolean autoCommit = connection.getAutoCommit();
|
||||
connection.setAutoCommit(true);
|
||||
final UUID nodeId = rawReadNodeId();
|
||||
return nodeId;
|
||||
try {
|
||||
return rawReadNodeId();
|
||||
} finally {
|
||||
connection.setAutoCommit(autoCommit);
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
|
@ -246,8 +189,13 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
|||
synchronized (connection) {
|
||||
try {
|
||||
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
|
||||
final boolean autoCommit = connection.getAutoCommit();
|
||||
connection.setAutoCommit(true);
|
||||
rawWriteNodeId(nodeId);
|
||||
try {
|
||||
rawWriteNodeId(nodeId);
|
||||
} finally {
|
||||
connection.setAutoCommit(autoCommit);
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
|
@ -258,7 +206,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
|||
final PreparedStatement preparedStatement = this.writeNodeId;
|
||||
preparedStatement.setString(1, nodeId.toString());
|
||||
if (preparedStatement.executeUpdate() != 1) {
|
||||
throw new IllegalStateException("can't write NODE_ID on the JDBC Node Manager Store!");
|
||||
throw new IllegalStateException("can't write NodeId on the JDBC Node Manager Store!");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -283,9 +231,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
|||
return nodeId;
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Error while attempting to setup the NodeId", e);
|
||||
}
|
||||
logger.debug("Error while attempting to setup the NodeId", e);
|
||||
lastError = e;
|
||||
}
|
||||
}
|
||||
|
@ -299,36 +245,34 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
|||
}
|
||||
|
||||
private UUID initializeOrReadNodeId(final UUID newNodeId) throws SQLException {
|
||||
final UUID nodeId;
|
||||
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
|
||||
connection.setAutoCommit(false);
|
||||
try {
|
||||
//optimistic try to initialize nodeId
|
||||
if (rawInitializeNodeId(newNodeId)) {
|
||||
nodeId = newNodeId;
|
||||
} else {
|
||||
nodeId = rawReadNodeId();
|
||||
synchronized (connection) {
|
||||
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
|
||||
final boolean autoCommit = connection.getAutoCommit();
|
||||
connection.setAutoCommit(false);
|
||||
try {
|
||||
final UUID nodeId;
|
||||
//optimistic try to initialize nodeId
|
||||
if (rawInitializeNodeId(newNodeId)) {
|
||||
nodeId = newNodeId;
|
||||
} else {
|
||||
nodeId = rawReadNodeId();
|
||||
}
|
||||
if (nodeId != null) {
|
||||
connection.commit();
|
||||
return nodeId;
|
||||
} else {
|
||||
//rawInitializeNodeId has failed just due to contention or nodeId wasn't committed yet
|
||||
connection.rollback();
|
||||
logger.debugf("Rollback after failed to update NodeId to %s and haven't found any NodeId", newNodeId);
|
||||
return null;
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
connection.rollback();
|
||||
logger.debugf(e, "Rollback while trying to update NodeId to %s", newNodeId);
|
||||
return null;
|
||||
} finally {
|
||||
connection.setAutoCommit(autoCommit);
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
connection.rollback();
|
||||
connection.setAutoCommit(true);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Rollback while trying to update NodeId to " + newNodeId, e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
if (nodeId != null) {
|
||||
connection.commit();
|
||||
connection.setAutoCommit(true);
|
||||
return nodeId;
|
||||
} else {
|
||||
//that means that the rawInitializeNodeId has failed just due to contention or the nodeId wasn't committed yet
|
||||
connection.rollback();
|
||||
connection.setAutoCommit(true);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Rollback after failed to update NodeId to " + newNodeId + " and haven't found any NodeId");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -370,17 +314,26 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
|||
synchronized (connection) {
|
||||
try {
|
||||
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
|
||||
connection.setAutoCommit(true);
|
||||
final boolean autoCommit = connection.getAutoCommit();
|
||||
connection.setAutoCommit(false);
|
||||
final State state;
|
||||
final PreparedStatement preparedStatement = this.readState;
|
||||
try (ResultSet resultSet = preparedStatement.executeQuery()) {
|
||||
if (!resultSet.next()) {
|
||||
state = State.FIRST_TIME_START;
|
||||
} else {
|
||||
state = decodeState(resultSet.getString(1));
|
||||
try {
|
||||
final PreparedStatement preparedStatement = this.readState;
|
||||
try (ResultSet resultSet = preparedStatement.executeQuery()) {
|
||||
if (!resultSet.next()) {
|
||||
state = State.FIRST_TIME_START;
|
||||
} else {
|
||||
state = decodeState(resultSet.getString(1));
|
||||
}
|
||||
}
|
||||
connection.commit();
|
||||
return state;
|
||||
} catch (SQLException ie) {
|
||||
connection.rollback();
|
||||
throw new IllegalStateException(ie);
|
||||
} finally {
|
||||
connection.setAutoCommit(autoCommit);
|
||||
}
|
||||
return state;
|
||||
} catch (SQLException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
|
@ -393,11 +346,21 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
|||
synchronized (connection) {
|
||||
try {
|
||||
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
|
||||
connection.setAutoCommit(true);
|
||||
final PreparedStatement preparedStatement = this.writeState;
|
||||
preparedStatement.setString(1, encodedState);
|
||||
if (preparedStatement.executeUpdate() != 1) {
|
||||
throw new IllegalStateException("can't write STATE to the JDBC Node Manager Store!");
|
||||
final boolean autoCommit = connection.getAutoCommit();
|
||||
connection.setAutoCommit(false);
|
||||
try {
|
||||
final PreparedStatement preparedStatement = this.writeState;
|
||||
preparedStatement.setString(1, encodedState);
|
||||
if (preparedStatement.executeUpdate() != 1) {
|
||||
throw new IllegalStateException("can't write state to the JDBC Node Manager Store!");
|
||||
}
|
||||
connection.commit();
|
||||
} catch (SQLException ie) {
|
||||
connection.rollback();
|
||||
connection.setAutoCommit(true);
|
||||
throw new IllegalStateException(ie);
|
||||
} finally {
|
||||
connection.setAutoCommit(autoCommit);
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new IllegalStateException(e);
|
||||
|
@ -408,17 +371,15 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
|||
@Override
|
||||
public void stop() throws SQLException {
|
||||
//release all the managed resources inside the connection lock
|
||||
if (sqlProvider.closeConnectionOnShutdown()) {
|
||||
synchronized (connection) {
|
||||
this.readNodeId.close();
|
||||
this.writeNodeId.close();
|
||||
this.initializeNodeId.close();
|
||||
this.readState.close();
|
||||
this.writeState.close();
|
||||
this.liveLock.close();
|
||||
this.backupLock.close();
|
||||
super.stop();
|
||||
}
|
||||
synchronized (connection) {
|
||||
this.readNodeId.close();
|
||||
this.writeNodeId.close();
|
||||
this.initializeNodeId.close();
|
||||
this.readState.close();
|
||||
this.writeState.close();
|
||||
this.liveLock.close();
|
||||
this.backupLock.close();
|
||||
super.stop();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1970,13 +1970,6 @@
|
|||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
<xsd:element name="jdbc-max-allowed-millis-from-db-time" type="xsd:int" minOccurs="0" maxOccurs="1">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
The absolute time in milliseconds the system clock is allowed to be distant from the DB time
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
</xsd:all>
|
||||
<xsd:attributeGroup ref="xml:specialAttrs"/>
|
||||
</xsd:complexType>
|
||||
|
|
|
@ -71,8 +71,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
|
|||
UUID.randomUUID().toString(),
|
||||
jdbcSharedStateManager.getConnection(),
|
||||
sqlProvider,
|
||||
acquireMillis,
|
||||
jdbcSharedStateManager.timeDifferenceMillisFromDb());
|
||||
acquireMillis);
|
||||
} catch (SQLException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
|
@ -100,7 +99,6 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
|
|||
.usingConnectionUrl(
|
||||
UUID.randomUUID().toString(),
|
||||
dbConf.getJdbcLockExpirationMillis(),
|
||||
dbConf.getJdbcMaxAllowedMillisFromDbTime(),
|
||||
dbConf.getJdbcConnectionUrl(),
|
||||
dbConf.getJdbcDriverClassName(),
|
||||
sqlProvider);
|
||||
|
|
|
@ -52,7 +52,6 @@ public class JdbcSharedStateManagerTest extends ActiveMQTestBase {
|
|||
return JdbcSharedStateManager.usingConnectionUrl(
|
||||
UUID.randomUUID().toString(),
|
||||
dbConf.getJdbcLockExpirationMillis(),
|
||||
dbConf.getJdbcMaxAllowedMillisFromDbTime(),
|
||||
dbConf.getJdbcConnectionUrl(),
|
||||
dbConf.getJdbcDriverClassName(),
|
||||
sqlProvider);
|
||||
|
|
|
@ -479,7 +479,6 @@ public abstract class ActiveMQTestBase extends Assert {
|
|||
dbStorageConfiguration.setJdbcLockAcquisitionTimeoutMillis(getJdbcLockAcquisitionTimeoutMillis());
|
||||
dbStorageConfiguration.setJdbcLockExpirationMillis(getJdbcLockExpirationMillis());
|
||||
dbStorageConfiguration.setJdbcLockRenewPeriodMillis(getJdbcLockRenewPeriodMillis());
|
||||
dbStorageConfiguration.setJdbcMaxAllowedMillisFromDbTime(getJdbcMaxAllowedMillisFromDbTime());
|
||||
return dbStorageConfiguration;
|
||||
}
|
||||
|
||||
|
@ -495,10 +494,6 @@ public abstract class ActiveMQTestBase extends Assert {
|
|||
return Long.getLong("jdbc.lock.renew", ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis());
|
||||
}
|
||||
|
||||
protected long getJdbcMaxAllowedMillisFromDbTime() {
|
||||
return Long.getLong("jdbc.max.diff.db", ActiveMQDefaultConfiguration.getDefaultJdbcMaxAllowedMillisFromDbTime());
|
||||
}
|
||||
|
||||
public void destroyTables(List<String> tableNames) throws Exception {
|
||||
Driver driver = getDriver(getJDBCClassName());
|
||||
Connection connection = driver.connect(getTestJDBCConnectionUrl(), null);
|
||||
|
|
|
@ -468,10 +468,6 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a
|
|||
The time in milliseconds a JDBC lock is considered valid without keeping it alive. The default value
|
||||
is 20000 milliseconds (ie 20 seconds).
|
||||
|
||||
- `jdbc-max-allowed-millis-from-db-time`
|
||||
|
||||
The absolute time in milliseconds the system clock is allowed to be distant from the DB time, otherwise a critical error will be raised. The default value is 60000 milliseconds (ie 60 seconds).
|
||||
|
||||
Note that some DBMS (e.g. Oracle, 30 chars) have restrictions on the size of table names, this should be taken into consideration when configuring table names for the Artemis database store, pay particular attention to the page store table name, which can be appended with a unique ID of up to 20 characters. (for Oracle this would mean configuring a page-store-table-name of max size of 10 chars).
|
||||
|
||||
## Zero Persistence
|
||||
|
|
Loading…
Reference in New Issue