diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java index 9da86713dd..e6e61e0c84 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java @@ -299,9 +299,6 @@ public class Create extends InputAbstract { @Option(name = "--jdbc-lock-expiration", description = "Lock expiration") long jdbcLockExpiration = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis(); - @Option(name = "--jdbc-max-allowed-millis-from-db-time", description = "Db time allowed difference") - long jdbcMaxAllowedMillisFromDbTime = ActiveMQDefaultConfiguration.getDefaultJdbcMaxAllowedMillisFromDbTime(); - private boolean IS_WINDOWS; private boolean IS_CYGWIN; @@ -629,7 +626,6 @@ public class Create extends InputAbstract { filters.put("${jdbcNetworkTimeout}", "" + jdbcNetworkTimeout); filters.put("${jdbcLockRenewPeriod}", "" + jdbcLockRenewPeriod); filters.put("${jdbcLockExpiration}", "" + jdbcLockExpiration); - filters.put("${jdbcMaxAllowedMillisFromDbTime}", "" + jdbcMaxAllowedMillisFromDbTime); filters.put("${jdbc}", readTextFile(ETC_DATABASE_STORE_TXT, filters)); } else { filters.put("${jdbc}", ""); diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/database-store.txt b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/database-store.txt index 2be2787e0f..564c513a81 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/database-store.txt +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/database-store.txt @@ -13,7 +13,6 @@ ${jdbcNodeManager} ${jdbcLockExpiration} ${jdbcLockRenewPeriod} - ${jdbcMaxAllowedMillisFromDbTime} ${jdbcNetworkTimeout} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 2a634d7f21..c764408fc9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -451,8 +451,6 @@ public final class ActiveMQDefaultConfiguration { private static final long DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = -1; - private static final long DEFAULT_JDBC_MAX_ALLOWED_MILLIS_FROM_DB_TIME = TimeUnit.SECONDS.toMillis(60); - // Default period to wait between connection TTL checks public static final long DEFAULT_CONNECTION_TTL_CHECK_INTERVAL = 2000; @@ -1258,10 +1256,6 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS; } - public static long getDefaultJdbcMaxAllowedMillisFromDbTime() { - return DEFAULT_JDBC_MAX_ALLOWED_MILLIS_FROM_DB_TIME; - } - public static long getDefaultConnectionTtlCheckInterval() { return DEFAULT_CONNECTION_TTL_CHECK_INTERVAL; } diff --git a/artemis-jdbc-store/src/main/resources/journal-sql.properties b/artemis-jdbc-store/src/main/resources/journal-sql.properties index 0d1928d6e4..86e6a3d710 100644 --- a/artemis-jdbc-store/src/main/resources/journal-sql.properties +++ b/artemis-jdbc-store/src/main/resources/journal-sql.properties @@ -38,10 +38,10 @@ count-journal-record=SELECT COUNT(*) FROM %s create-node-manager-store-table=CREATE TABLE %s (ID INT NOT NULL, HOLDER_ID VARCHAR(128), HOLDER_EXPIRATION_TIME TIMESTAMP, NODE_ID CHAR(36),STATE CHAR(1), PRIMARY KEY(ID)) create-state=INSERT INTO %s (ID) VALUES (%s) -try-acquire-lock=UPDATE %s SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP) AND ID = %s +try-acquire-lock=UPDATE %s SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR (HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP AND ? > CURRENT_TIMESTAMP)) AND ID = %s try-release-lock=UPDATE %s SET HOLDER_ID = NULL, HOLDER_EXPIRATION_TIME = NULL WHERE HOLDER_ID = ? AND ID = %s -is-locked=SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME FROM %s WHERE ID = %s -renew-lock=UPDATE %s SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND ID = %s +is-locked=SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME, CURRENT_TIMESTAMP FROM %s WHERE ID = %s +renew-lock=UPDATE %s SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND HOLDER_EXPIRATION_TIME IS NOT NULL AND ? > HOLDER_EXPIRATION_TIME AND ? > CURRENT_TIMESTAMP AND ID = %s current-timestamp=SELECT CURRENT_TIMESTAMP FROM %s write-state=UPDATE %s SET STATE = ? WHERE ID = %s read-state=SELECT STATE FROM %s WHERE ID = %s diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java index 4e04a40b6d..2707fb72cc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java @@ -50,8 +50,6 @@ public class DatabaseStorageConfiguration implements StoreConfiguration { private long jdbcLockAcquisitionTimeoutMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis(); - private long jdbcMaxAllowedMillisFromDbTime = ActiveMQDefaultConfiguration.getDefaultJdbcMaxAllowedMillisFromDbTime(); - private long jdbcJournalSyncPeriodMillis = ActiveMQDefaultConfiguration.getDefaultJdbcJournalSyncPeriodMillis(); @Override @@ -187,12 +185,4 @@ public class DatabaseStorageConfiguration implements StoreConfiguration { public void setJdbcJournalSyncPeriodMillis(long jdbcJournalSyncPeriodMillis) { this.jdbcJournalSyncPeriodMillis = jdbcJournalSyncPeriodMillis; } - - public long getJdbcMaxAllowedMillisFromDbTime() { - return jdbcMaxAllowedMillisFromDbTime; - } - - public void setJdbcMaxAllowedMillisFromDbTime(long jdbcMaxAllowedMillisFromDbTime) { - this.jdbcMaxAllowedMillisFromDbTime = jdbcMaxAllowedMillisFromDbTime; - } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index c4e4ea1340..0444ba8879 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -1473,7 +1473,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { conf.setJdbcNetworkTimeout(getInteger(storeNode, "jdbc-network-timeout", conf.getJdbcNetworkTimeout(), Validators.NO_CHECK)); conf.setJdbcLockRenewPeriodMillis(getLong(storeNode, "jdbc-lock-renew-period", conf.getJdbcLockRenewPeriodMillis(), Validators.NO_CHECK)); conf.setJdbcLockExpirationMillis(getLong(storeNode, "jdbc-lock-expiration", conf.getJdbcLockExpirationMillis(), Validators.NO_CHECK)); - conf.setJdbcMaxAllowedMillisFromDbTime(getLong(storeNode, "jdbc-max-allowed-millis-from-db-time", conf.getJdbcMaxAllowedMillisFromDbTime(), Validators.NO_CHECK)); conf.setJdbcJournalSyncPeriodMillis(getLong(storeNode, "jdbc-journal-sync-period", conf.getJdbcJournalSyncPeriodMillis(), Validators.NO_CHECK)); return conf; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java index 03f04ec290..11c1aabddc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java @@ -23,6 +23,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import org.jboss.logging.Logger; @@ -35,14 +36,15 @@ final class JdbcLeaseLock implements LeaseLock { private static final Logger LOGGER = Logger.getLogger(JdbcLeaseLock.class); private static final int MAX_HOLDER_ID_LENGTH = 128; private final Connection connection; - private long millisDiffFromDbTime; private final String holderId; private final PreparedStatement tryAcquireLock; private final PreparedStatement tryReleaseLock; private final PreparedStatement renewLock; private final PreparedStatement isLocked; + private final PreparedStatement currentDateTime; private final long expirationMillis; private boolean maybeAcquired; + private final String lockName; /** * The lock will be responsible (ie {@link #close()}) of all the {@link PreparedStatement}s used by it, but not of the {@link Connection}, @@ -54,20 +56,22 @@ final class JdbcLeaseLock implements LeaseLock { PreparedStatement tryReleaseLock, PreparedStatement renewLock, PreparedStatement isLocked, + PreparedStatement currentDateTime, long expirationMIllis, - long millisDiffFromDbTime) { + String lockName) { if (holderId.length() > MAX_HOLDER_ID_LENGTH) { throw new IllegalArgumentException("holderId length must be <=" + MAX_HOLDER_ID_LENGTH); } this.holderId = holderId; - this.millisDiffFromDbTime = millisDiffFromDbTime; this.tryAcquireLock = tryAcquireLock; this.tryReleaseLock = tryReleaseLock; this.renewLock = renewLock; this.isLocked = isLocked; + this.currentDateTime = currentDateTime; this.expirationMillis = expirationMIllis; this.maybeAcquired = false; this.connection = connection; + this.lockName = lockName; } public String holderId() { @@ -79,32 +83,88 @@ final class JdbcLeaseLock implements LeaseLock { return expirationMillis; } - private long timeDifference() { - return millisDiffFromDbTime; + private String readableLockStatus() { + try { + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + final boolean autoCommit = connection.getAutoCommit(); + connection.setAutoCommit(false); + try { + final String lockStatus; + final PreparedStatement preparedStatement = this.isLocked; + try (ResultSet resultSet = preparedStatement.executeQuery()) { + if (!resultSet.next()) { + lockStatus = null; + } else { + final String currentHolderId = resultSet.getString(1); + final Timestamp expirationTime = resultSet.getTimestamp(2); + final Timestamp currentTimestamp = resultSet.getTimestamp(3); + lockStatus = "holderId = " + currentHolderId + " expirationTime = " + expirationTime + " currentTimestamp = " + currentTimestamp; + } + } + connection.commit(); + return lockStatus; + } catch (SQLException ie) { + connection.rollback(); + return ie.getMessage(); + } finally { + connection.setAutoCommit(autoCommit); + } + } catch (SQLException e) { + return e.getMessage(); + } + } + + private long dbCurrentTimeMillis() throws SQLException { + final long start = System.nanoTime(); + try (ResultSet resultSet = currentDateTime.executeQuery()) { + resultSet.next(); + final Timestamp currentTimestamp = resultSet.getTimestamp(1); + final long elapsedTime = System.nanoTime() - start; + if (LOGGER.isDebugEnabled()) { + LOGGER.debugf("[%s] %s query currentTimestamp = %s tooks %d ms", + lockName, holderId, currentTimestamp, TimeUnit.NANOSECONDS.toMillis(elapsedTime)); + } + return currentTimestamp.getTime(); + } } @Override public boolean renew() { synchronized (connection) { try { - final boolean result; + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + final boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(false); try { - final long timeDifference = timeDifference(); final PreparedStatement preparedStatement = this.renewLock; - final long now = System.currentTimeMillis() + timeDifference; - final Timestamp timestamp = new Timestamp(now + expirationMillis); - preparedStatement.setTimestamp(1, timestamp); + final long now = dbCurrentTimeMillis(); + final Timestamp expirationTime = new Timestamp(now + expirationMillis); + if (LOGGER.isDebugEnabled()) { + LOGGER.debugf("[%s] %s is renewing lock with expirationTime = %s", + lockName, holderId, expirationTime); + } + preparedStatement.setTimestamp(1, expirationTime); preparedStatement.setString(2, holderId); - result = preparedStatement.executeUpdate() == 1; + preparedStatement.setTimestamp(3, expirationTime); + preparedStatement.setTimestamp(4, expirationTime); + final int updatedRows = preparedStatement.executeUpdate(); + final boolean renewed = updatedRows == 1; + connection.commit(); + if (!renewed) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debugf("[%s] %s has failed to renew lock: lock status = { %s }", + lockName, holderId, readableLockStatus()); + } + } else { + LOGGER.debugf("[%s] %s has renewed lock", lockName, holderId); + } + return renewed; } catch (SQLException ie) { connection.rollback(); - connection.setAutoCommit(true); throw new IllegalStateException(ie); + } finally { + connection.setAutoCommit(autoCommit); } - connection.commit(); - connection.setAutoCommit(true); - return result; } catch (SQLException e) { throw new IllegalStateException(e); } @@ -115,30 +175,36 @@ final class JdbcLeaseLock implements LeaseLock { public boolean tryAcquire() { synchronized (connection) { try { - final boolean acquired; + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + final boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(false); try { - final long timeDifference = timeDifference(); final PreparedStatement preparedStatement = tryAcquireLock; - final long now = System.currentTimeMillis() + timeDifference; + final long now = dbCurrentTimeMillis(); preparedStatement.setString(1, holderId); - final Timestamp timestamp = new Timestamp(now + expirationMillis); - preparedStatement.setTimestamp(2, timestamp); - acquired = preparedStatement.executeUpdate() == 1; + final Timestamp expirationTime = new Timestamp(now + expirationMillis); + preparedStatement.setTimestamp(2, expirationTime); + preparedStatement.setTimestamp(3, expirationTime); + LOGGER.debugf("[%s] %s is trying to acquire lock with expirationTime %s", + lockName, holderId, expirationTime); + final boolean acquired = preparedStatement.executeUpdate() == 1; + connection.commit(); + if (acquired) { + this.maybeAcquired = true; + LOGGER.debugf("[%s] %s has acquired lock", lockName, holderId); + } else { + if (LOGGER.isDebugEnabled()) { + LOGGER.debugf("[%s] %s has failed to acquire lock: lock status = { %s }", + lockName, holderId, readableLockStatus()); + } + } + return acquired; } catch (SQLException ie) { connection.rollback(); - connection.setAutoCommit(true); throw new IllegalStateException(ie); + } finally { + connection.setAutoCommit(autoCommit); } - connection.commit(); - connection.setAutoCommit(true); - if (acquired) { - this.maybeAcquired = true; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(holderId + " has acquired a lock"); - } - } - return acquired; } catch (SQLException e) { throw new IllegalStateException(e); } @@ -158,10 +224,11 @@ final class JdbcLeaseLock implements LeaseLock { private boolean checkValidHolderId(Predicate holderIdFilter) { synchronized (connection) { try { - boolean result; + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + final boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(false); try { - final long timeDifference = timeDifference(); + boolean result; final PreparedStatement preparedStatement = this.isLocked; try (ResultSet resultSet = preparedStatement.executeQuery()) { if (!resultSet.next()) { @@ -169,29 +236,33 @@ final class JdbcLeaseLock implements LeaseLock { } else { final String currentHolderId = resultSet.getString(1); result = holderIdFilter.test(currentHolderId); - //warn about any zombie lock - final Timestamp timestamp = resultSet.getTimestamp(2); - if (timestamp != null) { - final long lockExpirationTime = timestamp.getTime(); - final long now = System.currentTimeMillis() + timeDifference; - final long expiredBy = now - lockExpirationTime; + final Timestamp expirationTime = resultSet.getTimestamp(2); + final Timestamp currentTimestamp = resultSet.getTimestamp(3); + final long currentTimestampMillis = currentTimestamp.getTime(); + boolean zombie = false; + if (expirationTime != null) { + final long lockExpirationTime = expirationTime.getTime(); + final long expiredBy = currentTimestampMillis - lockExpirationTime; if (expiredBy > 0) { result = false; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("found zombie lock with holderId: " + currentHolderId + " expired by: " + expiredBy + " ms"); - } + zombie = true; } } + if (LOGGER.isDebugEnabled()) { + LOGGER.debugf("[%s] %s has found %s with holderId = %s expirationTime = %s currentTimestamp = %s", + lockName, holderId, zombie ? "zombie lock" : "lock", + currentHolderId, expirationTime, currentTimestamp); + } } } + connection.commit(); + return result; } catch (SQLException ie) { connection.rollback(); - connection.setAutoCommit(true); throw new IllegalStateException(ie); + } finally { + connection.setAutoCommit(autoCommit); } - connection.commit(); - connection.setAutoCommit(true); - return result; } catch (SQLException e) { throw new IllegalStateException(e); } @@ -202,26 +273,30 @@ final class JdbcLeaseLock implements LeaseLock { public void release() { synchronized (connection) { try { + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + final boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(false); try { final PreparedStatement preparedStatement = this.tryReleaseLock; preparedStatement.setString(1, holderId); - if (preparedStatement.executeUpdate() != 1) { - LOGGER.warn(holderId + " has failed to release a lock"); - } else { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(holderId + " has released a lock"); - } - } + final boolean released = preparedStatement.executeUpdate() == 1; //consider it as released to avoid on finalize to be reclaimed this.maybeAcquired = false; + connection.commit(); + if (!released) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debugf("[%s] %s has failed to release lock: lock status = { %s }", + lockName, holderId, readableLockStatus()); + } + } else { + LOGGER.debugf("[%s] %s has released lock", lockName, holderId); + } } catch (SQLException ie) { connection.rollback(); - connection.setAutoCommit(true); throw new IllegalStateException(ie); + } finally { + connection.setAutoCommit(autoCommit); } - connection.commit(); - connection.setAutoCommit(true); } catch (SQLException e) { throw new IllegalStateException(e); } @@ -242,6 +317,7 @@ final class JdbcLeaseLock implements LeaseLock { this.tryAcquireLock.close(); this.renewLock.close(); this.isLocked.close(); + this.currentDateTime.close(); } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java index fb76f0434d..322ea38654 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java @@ -41,7 +41,7 @@ import org.jboss.logging.Logger; */ public final class JdbcNodeManager extends NodeManager { - private static final Logger logger = Logger.getLogger(JdbcNodeManager.class); + private static final Logger LOGGER = Logger.getLogger(JdbcNodeManager.class); private static final long MAX_PAUSE_MILLIS = 2000L; private final Supplier sharedStateManagerFactory; @@ -50,7 +50,6 @@ public final class JdbcNodeManager extends NodeManager { private SharedStateManager sharedStateManager; private ScheduledLeaseLock scheduledLiveLock; private ScheduledLeaseLock scheduledBackupLock; - private final long lockRenewPeriodMillis; private final long lockAcquisitionTimeoutMillis; private volatile boolean interrupted = false; private final LeaseLock.Pauser pauser; @@ -74,7 +73,6 @@ public final class JdbcNodeManager extends NodeManager { configuration.getJdbcLockExpirationMillis(), configuration.getJdbcLockRenewPeriodMillis(), configuration.getJdbcLockAcquisitionTimeoutMillis(), - configuration.getJdbcMaxAllowedMillisFromDbTime(), configuration.getDataSource(), sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER), scheduledExecutorService, @@ -88,7 +86,6 @@ public final class JdbcNodeManager extends NodeManager { configuration.getJdbcLockExpirationMillis(), configuration.getJdbcLockRenewPeriodMillis(), configuration.getJdbcLockAcquisitionTimeoutMillis(), - configuration.getJdbcMaxAllowedMillisFromDbTime(), configuration.getJdbcConnectionUrl(), configuration.getJdbcDriverClassName(), sqlProvider, @@ -103,7 +100,6 @@ public final class JdbcNodeManager extends NodeManager { long lockExpirationMillis, long lockRenewPeriodMillis, long lockAcquisitionTimeoutMillis, - long maxAllowedMillisFromDbTime, DataSource dataSource, SQLProvider provider, ScheduledExecutorService scheduledExecutorService, @@ -114,10 +110,8 @@ public final class JdbcNodeManager extends NodeManager { networkTimeoutMillis, executorFactory == null ? null : executorFactory.getExecutor(), lockExpirationMillis, - maxAllowedMillisFromDbTime, dataSource, provider), - false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, @@ -130,7 +124,6 @@ public final class JdbcNodeManager extends NodeManager { long lockExpirationMillis, long lockRenewPeriodMillis, long lockAcquisitionTimeoutMillis, - long maxAllowedMillisFromDbTime, String jdbcUrl, String driverClass, SQLProvider provider, @@ -142,11 +135,9 @@ public final class JdbcNodeManager extends NodeManager { networkTimeoutMillis, executorFactory == null ? null : executorFactory.getExecutor(), lockExpirationMillis, - maxAllowedMillisFromDbTime, jdbcUrl, driverClass, provider), - false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, @@ -169,24 +160,22 @@ public final class JdbcNodeManager extends NodeManager { final int networkTimeout = configuration.getJdbcNetworkTimeout(); if (networkTimeout >= 0) { if (networkTimeout > lockExpiration) { - logger.warn("jdbc-network-timeout isn't properly configured: the recommended value is <= jdbc-lock-expiration"); + LOGGER.warn("jdbc-network-timeout isn't properly configured: the recommended value is <= jdbc-lock-expiration"); } } else { - logger.warn("jdbc-network-timeout isn't properly configured: the recommended value is <= jdbc-lock-expiration"); + LOGGER.warn("jdbc-network-timeout isn't properly configured: the recommended value is <= jdbc-lock-expiration"); } } private JdbcNodeManager(Supplier sharedStateManagerFactory, - boolean replicatedBackup, long lockRenewPeriodMillis, long lockAcquisitionTimeoutMillis, ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory, IOCriticalErrorListener ioCriticalErrorListener) { - super(replicatedBackup, null); + super(false, null); this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis; - this.lockRenewPeriodMillis = lockRenewPeriodMillis; - this.pauser = LeaseLock.Pauser.sleep(Math.min(this.lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS); + this.pauser = LeaseLock.Pauser.sleep(Math.min(lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS); this.sharedStateManagerFactory = sharedStateManagerFactory; this.scheduledLiveLockFactory = () -> ScheduledLeaseLock.of( scheduledExecutorService, @@ -217,10 +206,9 @@ public final class JdbcNodeManager extends NodeManager { return; } this.sharedStateManager = sharedStateManagerFactory.get(); - if (!replicatedBackup) { - final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID); - setUUID(nodeId); - } + LOGGER.debug("setup sharedStateManager on start"); + final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID); + setUUID(nodeId); this.scheduledLiveLock = scheduledLiveLockFactory.get(); this.scheduledBackupLock = scheduledBackupLockFactory.get(); super.start(); @@ -259,35 +247,62 @@ public final class JdbcNodeManager extends NodeManager { @Override public boolean isAwaitingFailback() throws Exception { - return readSharedState() == SharedStateManager.State.FAILING_BACK; + LOGGER.debug("ENTER isAwaitingFailback"); + try { + return readSharedState() == SharedStateManager.State.FAILING_BACK; + } finally { + LOGGER.debug("EXIT isAwaitingFailback"); + } } @Override public boolean isBackupLive() throws Exception { - //is anyone holding the live lock? - return this.scheduledLiveLock.lock().isHeld(); + LOGGER.debug("ENTER isBackupLive"); + try { + //is anyone holding the live lock? + return this.scheduledLiveLock.lock().isHeld(); + } finally { + LOGGER.debug("EXIT isBackupLive"); + } } @Override public void stopBackup() throws Exception { - if (replicatedBackup) { - final UUID nodeId = getUUID(); - sharedStateManager.writeNodeId(nodeId); + LOGGER.debug("ENTER stopBackup"); + try { + if (this.scheduledBackupLock.isStarted()) { + LOGGER.debug("scheduledBackupLock is running: stop it and release backup lock"); + this.scheduledBackupLock.stop(); + this.scheduledBackupLock.lock().release(); + } else { + LOGGER.debug("scheduledBackupLock is not running"); + } + } finally { + LOGGER.debug("EXIT stopBackup"); } - releaseBackup(); } @Override public void interrupt() { + LOGGER.debug("ENTER interrupted"); //need to be volatile: must be called concurrently to work as expected interrupted = true; + LOGGER.debug("EXIT interrupted"); } @Override public void releaseBackup() throws Exception { - if (this.scheduledBackupLock.lock().isHeldByCaller()) { - this.scheduledBackupLock.stop(); - this.scheduledBackupLock.lock().release(); + LOGGER.debug("ENTER releaseBackup"); + try { + if (this.scheduledBackupLock.isStarted()) { + LOGGER.debug("scheduledBackupLock is running: stop it and release backup lock"); + this.scheduledBackupLock.stop(); + this.scheduledBackupLock.lock().release(); + } else { + LOGGER.debug("scheduledBackupLock is not running"); + } + } finally { + LOGGER.debug("EXIT releaseBackup"); } } @@ -322,11 +337,8 @@ public final class JdbcNodeManager extends NodeManager { if (acquiredMillis > this.scheduledLiveLock.renewPeriodMillis()) { if (!this.scheduledLiveLock.lock().renew()) { final IllegalStateException e = new IllegalStateException("live lock can't be renewed"); - try { - ioCriticalErrorListener.onIOException(e, "live lock can't be renewed", null); - } finally { - throw e; - } + ioCriticalErrorListener.onIOException(e, "live lock can't be renewed", null); + throw e; } } } @@ -343,7 +355,7 @@ public final class JdbcNodeManager extends NodeManager { try { stateWhileLocked = readSharedState(); } catch (Throwable t) { - logger.error("error while holding the live node lock and tried to read the shared state", t); + LOGGER.error("error while holding the live node lock and tried to read the shared state", t); this.scheduledLiveLock.lock().release(); throw t; } @@ -351,9 +363,7 @@ public final class JdbcNodeManager extends NodeManager { renewLiveLockIfNeeded(acquiredOn); liveWhileLocked = true; } else { - if (logger.isDebugEnabled()) { - logger.debug("state is " + stateWhileLocked + " while holding the live lock"); - } + LOGGER.debugf("state is %s while holding the live lock: releasing live lock", stateWhileLocked); //state is not live: can (try to) release the lock this.scheduledLiveLock.lock().release(); } @@ -362,98 +372,133 @@ public final class JdbcNodeManager extends NodeManager { @Override public void awaitLiveNode() throws Exception { - boolean liveWhileLocked = false; - while (!liveWhileLocked) { - //check first without holding any lock - final SharedStateManager.State state = readSharedState(); - if (state == SharedStateManager.State.LIVE) { - //verify if the state is live while holding the live node lock too - liveWhileLocked = lockLiveAndCheckLiveState(); - } else { - if (logger.isDebugEnabled()) { - logger.debug("awaiting live node...state: " + state); + LOGGER.debug("ENTER awaitLiveNode"); + try { + boolean liveWhileLocked = false; + while (!liveWhileLocked) { + //check first without holding any lock + final SharedStateManager.State state = readSharedState(); + if (state == SharedStateManager.State.LIVE) { + //verify if the state is live while holding the live node lock too + liveWhileLocked = lockLiveAndCheckLiveState(); + } else { + LOGGER.debugf("state while awaiting live node: %s", state); + } + if (!liveWhileLocked) { + checkInterrupted(() -> "awaitLiveNode got interrupted!"); + pauser.idle(); } } - if (!liveWhileLocked) { - checkInterrupted(() -> "awaitLiveNode got interrupted!"); - pauser.idle(); - } + //state is LIVE and live lock is acquired and valid + LOGGER.debugf("acquired live node lock while state is %s: starting scheduledLiveLock", SharedStateManager.State.LIVE); + this.scheduledLiveLock.start(); + } finally { + LOGGER.debug("EXIT awaitLiveNode"); } - //state is LIVE and live lock is acquired and valid - logger.debug("acquired live node lock"); - this.scheduledLiveLock.start(); } @Override public void startBackup() throws Exception { - assert !replicatedBackup; // should not be called if this is a replicating backup - ActiveMQServerLogger.LOGGER.waitingToBecomeBackup(); + LOGGER.debug("ENTER startBackup"); + try { + ActiveMQServerLogger.LOGGER.waitingToBecomeBackup(); - lock(scheduledBackupLock.lock()); - scheduledBackupLock.start(); - ActiveMQServerLogger.LOGGER.gotBackupLock(); - if (getUUID() == null) - readNodeId(); + lock(scheduledBackupLock.lock()); + scheduledBackupLock.start(); + ActiveMQServerLogger.LOGGER.gotBackupLock(); + if (getUUID() == null) + readNodeId(); + } finally { + LOGGER.debug("EXIT startBackup"); + } } @Override public ActivateCallback startLiveNode() throws Exception { - setFailingBack(); + LOGGER.debug("ENTER startLiveNode"); + try { + setFailingBack(); - final String timeoutMessage = lockAcquisitionTimeoutMillis == -1 ? "indefinitely" : lockAcquisitionTimeoutMillis + " milliseconds"; + final String timeoutMessage = lockAcquisitionTimeoutMillis == -1 ? "indefinitely" : lockAcquisitionTimeoutMillis + " milliseconds"; - ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage); + ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage); - lock(this.scheduledLiveLock.lock()); + lock(this.scheduledLiveLock.lock()); - this.scheduledLiveLock.start(); + this.scheduledLiveLock.start(); - ActiveMQServerLogger.LOGGER.obtainedLiveLock(); + ActiveMQServerLogger.LOGGER.obtainedLiveLock(); - return new ActivateCallback() { - @Override - public void activationComplete() { - try { - //state can be written only if the live renew task is running - setLive(); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + return new ActivateCallback() { + @Override + public void activationComplete() { + LOGGER.debug("ENTER activationComplete"); + try { + //state can be written only if the live renew task is running + setLive(); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + } finally { + LOGGER.debug("EXIT activationComplete"); + } } - } - }; + }; + } finally { + LOGGER.debug("EXIT startLiveNode"); + } } @Override public void pauseLiveServer() throws Exception { - if (scheduledLiveLock.isStarted()) { - setPaused(); - scheduledLiveLock.stop(); - scheduledLiveLock.lock().release(); - } else if (scheduledLiveLock.lock().renew()) { - setPaused(); - scheduledLiveLock.lock().release(); - } else { - final IllegalStateException e = new IllegalStateException("live lock can't be renewed"); - try { - ioCriticalErrorListener.onIOException(e, "live lock can't be renewed on pauseLiveServer", null); - } finally { - throw e; + LOGGER.debug("ENTER pauseLiveServer"); + try { + if (scheduledLiveLock.isStarted()) { + LOGGER.debug("scheduledLiveLock is running: set paused shared state, stop it and release live lock"); + setPaused(); + scheduledLiveLock.stop(); + scheduledLiveLock.lock().release(); + } else { + LOGGER.debug("scheduledLiveLock is not running: try renew live lock"); + if (scheduledLiveLock.lock().renew()) { + LOGGER.debug("live lock renewed: set paused shared state and release live lock"); + setPaused(); + scheduledLiveLock.lock().release(); + } else { + final IllegalStateException e = new IllegalStateException("live lock can't be renewed"); + ioCriticalErrorListener.onIOException(e, "live lock can't be renewed on pauseLiveServer", null); + throw e; + } } + } finally { + LOGGER.debug("EXIT pauseLiveServer"); } } @Override public void crashLiveServer() throws Exception { - if (this.scheduledLiveLock.lock().isHeldByCaller()) { - scheduledLiveLock.stop(); - this.scheduledLiveLock.lock().release(); + LOGGER.debug("ENTER crashLiveServer"); + try { + if (this.scheduledLiveLock.isStarted()) { + LOGGER.debug("scheduledLiveLock is running: request stop it and release live lock"); + this.scheduledLiveLock.stop(); + this.scheduledLiveLock.lock().release(); + } else { + LOGGER.debug("scheduledLiveLock is not running"); + } + } finally { + LOGGER.debug("EXIT crashLiveServer"); } } @Override public void awaitLiveStatus() { - while (readSharedState() != SharedStateManager.State.LIVE) { - pauser.idle(); + LOGGER.debug("ENTER awaitLiveStatus"); + try { + while (readSharedState() != SharedStateManager.State.LIVE) { + pauser.idle(); + } + } finally { + LOGGER.debug("EXIT awaitLiveStatus"); } } @@ -470,17 +515,20 @@ public final class JdbcNodeManager extends NodeManager { } private void writeSharedState(SharedStateManager.State state) { - assert !this.replicatedBackup : "the replicated backup can't write the shared state!"; + LOGGER.debugf("writeSharedState state = %s", state); this.sharedStateManager.writeState(state); } private SharedStateManager.State readSharedState() { - return this.sharedStateManager.readState(); + final SharedStateManager.State state = this.sharedStateManager.readState(); + LOGGER.debugf("readSharedState state = %s", state); + return state; } @Override public SimpleString readNodeId() { final UUID nodeId = this.sharedStateManager.readNodeId(); + LOGGER.debugf("readNodeId nodeId = %s", nodeId); setUUID(nodeId); return getNodeId(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java index a8b07e9309..9357435bbc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java @@ -22,8 +22,6 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Timestamp; import java.util.concurrent.Executor; import java.util.function.Supplier; @@ -39,10 +37,9 @@ import org.jboss.logging.Logger; final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedStateManager { private static final Logger logger = Logger.getLogger(JdbcSharedStateManager.class); - public static final int MAX_SETUP_ATTEMPTS = 20; + private static final int MAX_SETUP_ATTEMPTS = 20; private final String holderId; private final long lockExpirationMillis; - private final long maxAllowedMillisFromDbTime; private JdbcLeaseLock liveLock; private JdbcLeaseLock backupLock; private PreparedStatement readNodeId; @@ -50,16 +47,14 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS private PreparedStatement initializeNodeId; private PreparedStatement readState; private PreparedStatement writeState; - private long timeDifferenceMillisFromDb = 0; public static JdbcSharedStateManager usingDataSource(String holderId, int networkTimeout, Executor networkTimeoutExecutor, long locksExpirationMillis, - long maxAllowedMillisFromDbTime, DataSource dataSource, SQLProvider provider) { - final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis, maxAllowedMillisFromDbTime); + final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis); sharedStateManager.setNetworkTimeout(networkTimeoutExecutor, networkTimeout); sharedStateManager.setDataSource(dataSource); sharedStateManager.setSqlProvider(provider); @@ -73,7 +68,6 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS public static JdbcSharedStateManager usingConnectionUrl(String holderId, long locksExpirationMillis, - long maxAllowedMillisFromDbTime, String jdbcConnectionUrl, String jdbcDriverClass, SQLProvider provider) { @@ -81,7 +75,6 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS -1, null, locksExpirationMillis, - maxAllowedMillisFromDbTime, jdbcConnectionUrl, jdbcDriverClass, provider); @@ -91,11 +84,10 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS int networkTimeout, Executor networkTimeoutExecutor, long locksExpirationMillis, - long maxAllowedMillisFromDbTime, String jdbcConnectionUrl, String jdbcDriverClass, SQLProvider provider) { - final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis, maxAllowedMillisFromDbTime); + final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis); sharedStateManager.setNetworkTimeout(networkTimeoutExecutor, networkTimeout); sharedStateManager.setJdbcConnectionUrl(jdbcConnectionUrl); sharedStateManager.setJdbcDriverClass(jdbcDriverClass); @@ -109,63 +101,33 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS } @Override - protected void createSchema() throws SQLException { + protected void createSchema() { try { createTable(sqlProvider.createNodeManagerStoreTableSQL(), sqlProvider.createNodeIdSQL(), sqlProvider.createStateSQL(), sqlProvider.createLiveLockSQL(), sqlProvider.createBackupLockSQL()); } catch (SQLException e) { //no op: if a table already exists is not a problem in this case, the prepareStatements() call will fail right after it if the table is not correctly initialized - if (logger.isDebugEnabled()) { - logger.debug("Error while creating the schema of the JDBC shared state manager", e); - } - } - } - - /** - * It computes the distance in milliseconds of {@link System#currentTimeMillis()} from the DBMS time.
- * It must be added to {@link System#currentTimeMillis()} in order to approximate the DBMS time. - * It will create a transaction by its own. - */ - static long timeDifferenceMillisFromDb(Connection connection, SQLProvider sqlProvider) throws SQLException { - try (Statement statement = connection.createStatement()) { - connection.setAutoCommit(false); - final long result; - try (ResultSet resultSet = statement.executeQuery(sqlProvider.currentTimestampSQL())) { - resultSet.next(); - final Timestamp timestamp = resultSet.getTimestamp(1); - final long systemNow = System.currentTimeMillis(); - result = timestamp.getTime() - systemNow; - } catch (SQLException ie) { - connection.rollback(); - connection.setAutoCommit(true); - throw ie; - } - connection.commit(); - connection.setAutoCommit(true); - return result; + logger.debug("Error while creating the schema of the JDBC shared state manager", e); } } static JdbcLeaseLock createLiveLock(String holderId, Connection connection, SQLProvider sqlProvider, - long expirationMillis, - long timeDifferenceMillisFromDb) throws SQLException { - return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), expirationMillis, timeDifferenceMillisFromDb); + long expirationMillis) throws SQLException { + return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, "LIVE"); } static JdbcLeaseLock createBackupLock(String holderId, Connection connection, SQLProvider sqlProvider, - long expirationMillis, - long timeDifferenceMillisFromDb) throws SQLException { - return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), expirationMillis, timeDifferenceMillisFromDb); + long expirationMillis) throws SQLException { + return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, "BACKUP"); } @Override protected void prepareStatements() throws SQLException { - final long timeDifferenceMillisFromDb = validateTimeDifferenceMillisFromDb(); - this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, timeDifferenceMillisFromDb); - this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, timeDifferenceMillisFromDb); + this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis); + this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis); this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL()); this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL()); this.initializeNodeId = connection.prepareStatement(sqlProvider.initializeNodeIdSQL()); @@ -173,32 +135,9 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS this.readState = connection.prepareStatement(sqlProvider.readStateSQL()); } - /** - * It will be populated only after a {@link #start()}. - */ - long timeDifferenceMillisFromDb() { - return timeDifferenceMillisFromDb; - } - - private long validateTimeDifferenceMillisFromDb() throws SQLException { - final long timeDifferenceMillisFromDb = timeDifferenceMillisFromDb(connection, sqlProvider); - this.timeDifferenceMillisFromDb = timeDifferenceMillisFromDb; - final long absoluteTimeDifference = Math.abs(timeDifferenceMillisFromDb); - if (absoluteTimeDifference > maxAllowedMillisFromDbTime) { - throw new IllegalStateException("The system is far " + (-timeDifferenceMillisFromDb) + " milliseconds from DB time, exceeding maxAllowedMillisFromDbTime = " + maxAllowedMillisFromDbTime); - } - if (absoluteTimeDifference > 0) { - final String msg = "The system is far " + timeDifferenceMillisFromDb + " milliseconds from DB time"; - final Logger.Level logLevel = absoluteTimeDifference > lockExpirationMillis ? Logger.Level.WARN : Logger.Level.DEBUG; - logger.log(logLevel, msg); - } - return timeDifferenceMillisFromDb; - } - - private JdbcSharedStateManager(String holderId, long lockExpirationMillis, long maxAllowedMillisFromDbTime) { + private JdbcSharedStateManager(String holderId, long lockExpirationMillis) { this.holderId = holderId; this.lockExpirationMillis = lockExpirationMillis; - this.maxAllowedMillisFromDbTime = maxAllowedMillisFromDbTime; } @Override @@ -232,9 +171,13 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS synchronized (connection) { try { connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + final boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(true); - final UUID nodeId = rawReadNodeId(); - return nodeId; + try { + return rawReadNodeId(); + } finally { + connection.setAutoCommit(autoCommit); + } } catch (SQLException e) { throw new IllegalStateException(e); } @@ -246,8 +189,13 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS synchronized (connection) { try { connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + final boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(true); - rawWriteNodeId(nodeId); + try { + rawWriteNodeId(nodeId); + } finally { + connection.setAutoCommit(autoCommit); + } } catch (SQLException e) { throw new IllegalStateException(e); } @@ -258,7 +206,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS final PreparedStatement preparedStatement = this.writeNodeId; preparedStatement.setString(1, nodeId.toString()); if (preparedStatement.executeUpdate() != 1) { - throw new IllegalStateException("can't write NODE_ID on the JDBC Node Manager Store!"); + throw new IllegalStateException("can't write NodeId on the JDBC Node Manager Store!"); } } @@ -283,9 +231,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS return nodeId; } } catch (SQLException e) { - if (logger.isDebugEnabled()) { - logger.debug("Error while attempting to setup the NodeId", e); - } + logger.debug("Error while attempting to setup the NodeId", e); lastError = e; } } @@ -299,36 +245,34 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS } private UUID initializeOrReadNodeId(final UUID newNodeId) throws SQLException { - final UUID nodeId; - connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); - connection.setAutoCommit(false); - try { - //optimistic try to initialize nodeId - if (rawInitializeNodeId(newNodeId)) { - nodeId = newNodeId; - } else { - nodeId = rawReadNodeId(); + synchronized (connection) { + connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); + final boolean autoCommit = connection.getAutoCommit(); + connection.setAutoCommit(false); + try { + final UUID nodeId; + //optimistic try to initialize nodeId + if (rawInitializeNodeId(newNodeId)) { + nodeId = newNodeId; + } else { + nodeId = rawReadNodeId(); + } + if (nodeId != null) { + connection.commit(); + return nodeId; + } else { + //rawInitializeNodeId has failed just due to contention or nodeId wasn't committed yet + connection.rollback(); + logger.debugf("Rollback after failed to update NodeId to %s and haven't found any NodeId", newNodeId); + return null; + } + } catch (SQLException e) { + connection.rollback(); + logger.debugf(e, "Rollback while trying to update NodeId to %s", newNodeId); + return null; + } finally { + connection.setAutoCommit(autoCommit); } - } catch (SQLException e) { - connection.rollback(); - connection.setAutoCommit(true); - if (logger.isDebugEnabled()) { - logger.debug("Rollback while trying to update NodeId to " + newNodeId, e); - } - return null; - } - if (nodeId != null) { - connection.commit(); - connection.setAutoCommit(true); - return nodeId; - } else { - //that means that the rawInitializeNodeId has failed just due to contention or the nodeId wasn't committed yet - connection.rollback(); - connection.setAutoCommit(true); - if (logger.isDebugEnabled()) { - logger.debug("Rollback after failed to update NodeId to " + newNodeId + " and haven't found any NodeId"); - } - return null; } } @@ -370,17 +314,26 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS synchronized (connection) { try { connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); - connection.setAutoCommit(true); + final boolean autoCommit = connection.getAutoCommit(); + connection.setAutoCommit(false); final State state; - final PreparedStatement preparedStatement = this.readState; - try (ResultSet resultSet = preparedStatement.executeQuery()) { - if (!resultSet.next()) { - state = State.FIRST_TIME_START; - } else { - state = decodeState(resultSet.getString(1)); + try { + final PreparedStatement preparedStatement = this.readState; + try (ResultSet resultSet = preparedStatement.executeQuery()) { + if (!resultSet.next()) { + state = State.FIRST_TIME_START; + } else { + state = decodeState(resultSet.getString(1)); + } } + connection.commit(); + return state; + } catch (SQLException ie) { + connection.rollback(); + throw new IllegalStateException(ie); + } finally { + connection.setAutoCommit(autoCommit); } - return state; } catch (SQLException e) { throw new IllegalStateException(e); } @@ -393,11 +346,21 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS synchronized (connection) { try { connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); - connection.setAutoCommit(true); - final PreparedStatement preparedStatement = this.writeState; - preparedStatement.setString(1, encodedState); - if (preparedStatement.executeUpdate() != 1) { - throw new IllegalStateException("can't write STATE to the JDBC Node Manager Store!"); + final boolean autoCommit = connection.getAutoCommit(); + connection.setAutoCommit(false); + try { + final PreparedStatement preparedStatement = this.writeState; + preparedStatement.setString(1, encodedState); + if (preparedStatement.executeUpdate() != 1) { + throw new IllegalStateException("can't write state to the JDBC Node Manager Store!"); + } + connection.commit(); + } catch (SQLException ie) { + connection.rollback(); + connection.setAutoCommit(true); + throw new IllegalStateException(ie); + } finally { + connection.setAutoCommit(autoCommit); } } catch (SQLException e) { throw new IllegalStateException(e); @@ -408,17 +371,15 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS @Override public void stop() throws SQLException { //release all the managed resources inside the connection lock - if (sqlProvider.closeConnectionOnShutdown()) { - synchronized (connection) { - this.readNodeId.close(); - this.writeNodeId.close(); - this.initializeNodeId.close(); - this.readState.close(); - this.writeState.close(); - this.liveLock.close(); - this.backupLock.close(); - super.stop(); - } + synchronized (connection) { + this.readNodeId.close(); + this.writeNodeId.close(); + this.initializeNodeId.close(); + this.readState.close(); + this.writeState.close(); + this.liveLock.close(); + this.backupLock.close(); + super.stop(); } } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 2c24047a07..32e68e02fb 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -1970,13 +1970,6 @@ - - - - The absolute time in milliseconds the system clock is allowed to be distant from the DB time - - - diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java index 2ca08d4b28..84d3dd4f5a 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java @@ -71,8 +71,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase { UUID.randomUUID().toString(), jdbcSharedStateManager.getConnection(), sqlProvider, - acquireMillis, - jdbcSharedStateManager.timeDifferenceMillisFromDb()); + acquireMillis); } catch (SQLException e) { throw new IllegalStateException(e); } @@ -100,7 +99,6 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase { .usingConnectionUrl( UUID.randomUUID().toString(), dbConf.getJdbcLockExpirationMillis(), - dbConf.getJdbcMaxAllowedMillisFromDbTime(), dbConf.getJdbcConnectionUrl(), dbConf.getJdbcDriverClassName(), sqlProvider); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java index e7ac316f97..7340026768 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java @@ -52,7 +52,6 @@ public class JdbcSharedStateManagerTest extends ActiveMQTestBase { return JdbcSharedStateManager.usingConnectionUrl( UUID.randomUUID().toString(), dbConf.getJdbcLockExpirationMillis(), - dbConf.getJdbcMaxAllowedMillisFromDbTime(), dbConf.getJdbcConnectionUrl(), dbConf.getJdbcDriverClassName(), sqlProvider); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index acc55f91a9..f51d0f8236 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -479,7 +479,6 @@ public abstract class ActiveMQTestBase extends Assert { dbStorageConfiguration.setJdbcLockAcquisitionTimeoutMillis(getJdbcLockAcquisitionTimeoutMillis()); dbStorageConfiguration.setJdbcLockExpirationMillis(getJdbcLockExpirationMillis()); dbStorageConfiguration.setJdbcLockRenewPeriodMillis(getJdbcLockRenewPeriodMillis()); - dbStorageConfiguration.setJdbcMaxAllowedMillisFromDbTime(getJdbcMaxAllowedMillisFromDbTime()); return dbStorageConfiguration; } @@ -495,10 +494,6 @@ public abstract class ActiveMQTestBase extends Assert { return Long.getLong("jdbc.lock.renew", ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis()); } - protected long getJdbcMaxAllowedMillisFromDbTime() { - return Long.getLong("jdbc.max.diff.db", ActiveMQDefaultConfiguration.getDefaultJdbcMaxAllowedMillisFromDbTime()); - } - public void destroyTables(List tableNames) throws Exception { Driver driver = getDriver(getJDBCClassName()); Connection connection = driver.connect(getTestJDBCConnectionUrl(), null); diff --git a/docs/user-manual/en/persistence.md b/docs/user-manual/en/persistence.md index 11396adae3..76bb77ce3e 100644 --- a/docs/user-manual/en/persistence.md +++ b/docs/user-manual/en/persistence.md @@ -468,10 +468,6 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a The time in milliseconds a JDBC lock is considered valid without keeping it alive. The default value is 20000 milliseconds (ie 20 seconds). -- `jdbc-max-allowed-millis-from-db-time` - - The absolute time in milliseconds the system clock is allowed to be distant from the DB time, otherwise a critical error will be raised. The default value is 60000 milliseconds (ie 60 seconds). - Note that some DBMS (e.g. Oracle, 30 chars) have restrictions on the size of table names, this should be taken into consideration when configuring table names for the Artemis database store, pay particular attention to the page store table name, which can be appended with a unique ID of up to 20 characters. (for Oracle this would mean configuring a page-store-table-name of max size of 10 chars). ## Zero Persistence