ARTEMIS-1447 JDBC NodeManager to support JDBC HA Shared Store

(cherry picked from commit 7944a25269d939791bfbc2637e3c649a9137ad45)
This commit is contained in:
Francesco Nigro 2017-09-09 18:41:30 +02:00 committed by Clebert Suconic
parent b267e8a741
commit 565b817592
18 changed files with 1890 additions and 11 deletions

View File

@ -437,9 +437,18 @@ public final class ActiveMQDefaultConfiguration {
// Default Page Store table name, used with Database storage type
private static final String DEFAULT_PAGE_STORE_TABLE_NAME = "PAGE_STORE";
// Default node manager store table name, used with Database storage type
private static final String DEFAULT_NODE_MANAGER_STORE_TABLE_NAME = "NODE_MANAGER_STORE";
private static final int DEFAULT_JDBC_NETWORK_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20);
private static final long DEFAULT_JDBC_LOCK_RENEW_PERIOD_MILLIS = TimeUnit.SECONDS.toMillis(4);
private static final long DEFAULT_JDBC_LOCK_EXPIRATION_MILLIS = TimeUnit.SECONDS.toMillis(20);
private static final long DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60);
// Default JMS Bingings table name, used with Database storage type
private static final String DEFAULT_JMS_BINDINGS_TABLE_NAME = "JMS_BINDINGS";
@ -1197,10 +1206,26 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_PAGE_STORE_TABLE_NAME;
}
public static String getDefaultNodeManagerStoreTableName() {
return DEFAULT_NODE_MANAGER_STORE_TABLE_NAME;
}
public static int getDefaultJdbcNetworkTimeout() {
return DEFAULT_JDBC_NETWORK_TIMEOUT;
}
public static long getDefaultJdbcLockRenewPeriodMillis() {
return DEFAULT_JDBC_LOCK_RENEW_PERIOD_MILLIS;
}
public static long getDefaultJdbcLockExpirationMillis() {
return DEFAULT_JDBC_LOCK_EXPIRATION_MILLIS;
}
public static long getDefaultJdbcLockAcquisitionTimeoutMillis() {
return DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS;
}
public static String getDefaultJMSBindingsTableName() {
return DEFAULT_JMS_BINDINGS_TABLE_NAME;
}

View File

@ -18,6 +18,14 @@ package org.apache.activemq.artemis.jdbc.store.sql;
public class GenericSQLProvider implements SQLProvider {
/**
* The JDBC Node Manager shared state is contained in these 4 rows: each one is used exclusively for a specific purpose.
*/
private static final int STATE_ROW_ID = 0;
private static final int LIVE_LOCK_ROW_ID = 1;
private static final int BACKUP_LOCK_ROW_ID = 2;
private static final int NODE_ID_ROW_ID = 3;
// Default to lowest (MYSQL = 64k)
private static final long MAX_BLOB_SIZE = 64512;
@ -57,6 +65,42 @@ public class GenericSQLProvider implements SQLProvider {
private final String countJournalRecordsSQL;
private final String createNodeManagerStoreTableSQL;
private final String createStateSQL;
private final String createNodeIdSQL;
private final String createLiveLockSQL;
private final String createBackupLockSQL;
private final String tryAcquireLiveLockSQL;
private final String tryAcquireBackupLockSQL;
private final String tryReleaseLiveLockSQL;
private final String tryReleaseBackupLockSQL;
private final String isLiveLockedSQL;
private final String isBackupLockedSQL;
private final String renewLiveLockSQL;
private final String renewBackupLockSQL;
private final String currentTimestampSQL;
private final String writeStateSQL;
private final String readStateSQL;
private final String writeNodeIdSQL;
private final String readNodeIdSQL;
protected final DatabaseStoreType databaseStoreType;
protected GenericSQLProvider(String tableName, DatabaseStoreType databaseStoreType) {
@ -64,8 +108,7 @@ public class GenericSQLProvider implements SQLProvider {
this.databaseStoreType = databaseStoreType;
createFileTableSQL = "CREATE TABLE " + tableName +
"(ID BIGINT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
createFileTableSQL = "CREATE TABLE " + tableName + "(ID BIGINT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
insertFileSQL = "INSERT INTO " + tableName + " (FILENAME, EXTENSION, DATA) VALUES (?,?,?)";
@ -81,17 +124,13 @@ public class GenericSQLProvider implements SQLProvider {
updateFileNameByIdSQL = "UPDATE " + tableName + " SET FILENAME=? WHERE ID=?";
cloneFileRecordSQL = "INSERT INTO " + tableName + "(FILENAME, EXTENSION, DATA) " +
"(SELECT FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE ID=?)";
cloneFileRecordSQL = "INSERT INTO " + tableName + "(FILENAME, EXTENSION, DATA) " + "(SELECT FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE ID=?)";
copyFileRecordByIdSQL = "UPDATE " + tableName + " SET DATA = (SELECT DATA FROM " + tableName + " WHERE ID=?) WHERE ID=?";
dropFileTableSQL = "DROP TABLE " + tableName;
createJournalTableSQL = new String[] {
"CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT NOT NULL, PRIMARY KEY(seq))",
"CREATE INDEX " + tableName + "_IDX ON " + tableName + " (id)"
};
createJournalTableSQL = new String[]{"CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT NOT NULL, PRIMARY KEY(seq))", "CREATE INDEX " + tableName + "_IDX ON " + tableName + " (id)"};
insertJournalRecordsSQL = "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?)";
@ -102,6 +141,43 @@ public class GenericSQLProvider implements SQLProvider {
deleteJournalTxRecordsSQL = "DELETE FROM " + tableName + " WHERE txId=?";
countJournalRecordsSQL = "SELECT COUNT(*) FROM " + tableName;
createNodeManagerStoreTableSQL = "CREATE TABLE " + tableName + " ( ID INT NOT NULL, HOLDER_ID VARCHAR(128), HOLDER_EXPIRATION_TIME TIMESTAMP, NODE_ID CHAR(36),STATE CHAR(1), PRIMARY KEY(ID))";
createStateSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + STATE_ROW_ID + ")";
createNodeIdSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + NODE_ID_ROW_ID + ")";
createLiveLockSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + LIVE_LOCK_ROW_ID + ")";
createBackupLockSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + BACKUP_LOCK_ROW_ID + ")";
tryAcquireLiveLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP) AND ID = " + LIVE_LOCK_ROW_ID;
tryAcquireBackupLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP) AND ID = " + BACKUP_LOCK_ROW_ID;
tryReleaseLiveLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = NULL, HOLDER_EXPIRATION_TIME = NULL WHERE HOLDER_ID = ? AND ID = " + LIVE_LOCK_ROW_ID;
tryReleaseBackupLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = NULL, HOLDER_EXPIRATION_TIME = NULL WHERE HOLDER_ID = ? AND ID = " + BACKUP_LOCK_ROW_ID;
isLiveLockedSQL = "SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME FROM " + tableName + " WHERE ID = " + LIVE_LOCK_ROW_ID;
isBackupLockedSQL = "SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME FROM " + tableName + " WHERE ID = " + BACKUP_LOCK_ROW_ID;
renewLiveLockSQL = "UPDATE " + tableName + " SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND ID = " + LIVE_LOCK_ROW_ID;
renewBackupLockSQL = "UPDATE " + tableName + " SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND ID = " + BACKUP_LOCK_ROW_ID;
currentTimestampSQL = "SELECT CURRENT_TIMESTAMP FROM " + tableName;
writeStateSQL = "UPDATE " + tableName + " SET STATE = ? WHERE ID = " + STATE_ROW_ID;
readStateSQL = "SELECT STATE FROM " + tableName + " WHERE ID = " + STATE_ROW_ID;
writeNodeIdSQL = "UPDATE " + tableName + " SET NODE_ID = ? WHERE ID = " + NODE_ID_ROW_ID;
readNodeIdSQL = "SELECT NODE_ID FROM " + tableName + " WHERE ID = " + NODE_ID_ROW_ID;
}
@Override
@ -201,6 +277,96 @@ public class GenericSQLProvider implements SQLProvider {
return dropFileTableSQL;
}
@Override
public String createNodeManagerStoreTableSQL() {
return createNodeManagerStoreTableSQL;
}
@Override
public String createStateSQL() {
return createStateSQL;
}
@Override
public String createNodeIdSQL() {
return createNodeIdSQL;
}
@Override
public String createLiveLockSQL() {
return createLiveLockSQL;
}
@Override
public String createBackupLockSQL() {
return createBackupLockSQL;
}
@Override
public String tryAcquireLiveLockSQL() {
return tryAcquireLiveLockSQL;
}
@Override
public String tryAcquireBackupLockSQL() {
return tryAcquireBackupLockSQL;
}
@Override
public String tryReleaseLiveLockSQL() {
return tryReleaseLiveLockSQL;
}
@Override
public String tryReleaseBackupLockSQL() {
return tryReleaseBackupLockSQL;
}
@Override
public String isLiveLockedSQL() {
return isLiveLockedSQL;
}
@Override
public String isBackupLockedSQL() {
return isBackupLockedSQL;
}
@Override
public String renewLiveLockSQL() {
return renewLiveLockSQL;
}
@Override
public String renewBackupLockSQL() {
return renewBackupLockSQL;
}
@Override
public String currentTimestampSQL() {
return currentTimestampSQL;
}
@Override
public String writeStateSQL() {
return writeStateSQL;
}
@Override
public String readStateSQL() {
return readStateSQL;
}
@Override
public String writeNodeIdSQL() {
return writeNodeIdSQL;
}
@Override
public String readNodeIdSQL() {
return readNodeIdSQL;
}
@Override
public boolean closeConnectionOnShutdown() {
return true;

View File

@ -19,7 +19,7 @@ package org.apache.activemq.artemis.jdbc.store.sql;
public interface SQLProvider {
enum DatabaseStoreType {
PAGE, MESSAGE_JOURNAL, BINDINGS_JOURNAL, LARGE_MESSAGE
PAGE, MESSAGE_JOURNAL, BINDINGS_JOURNAL, LARGE_MESSAGE, NODE_MANAGER
}
long getMaxBlobSize();
@ -62,7 +62,44 @@ public interface SQLProvider {
boolean closeConnectionOnShutdown();
String createNodeManagerStoreTableSQL();
String createStateSQL();
String createNodeIdSQL();
String createLiveLockSQL();
String createBackupLockSQL();
String tryAcquireLiveLockSQL();
String tryAcquireBackupLockSQL();
String tryReleaseLiveLockSQL();
String tryReleaseBackupLockSQL();
String isLiveLockedSQL();
String isBackupLockedSQL();
String renewLiveLockSQL();
String renewBackupLockSQL();
String currentTimestampSQL();
String writeStateSQL();
String readStateSQL();
String writeNodeIdSQL();
String readNodeIdSQL();
interface Factory {
SQLProvider create(String tableName, DatabaseStoreType dbStoreType);
}
}

View File

@ -124,7 +124,12 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
<!-- db test -->
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>

View File

@ -34,6 +34,8 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
private String jmsBindingsTableName = ActiveMQDefaultConfiguration.getDefaultJMSBindingsTableName();
private String nodeManagerStoreTableName = ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName();
private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl();
private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName();
@ -44,6 +46,12 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
private int jdbcNetworkTimeout = ActiveMQDefaultConfiguration.getDefaultJdbcNetworkTimeout();
private long jdbcLockRenewPeriodMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis();
private long jdbcLockExpirationMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis();
private long jdbcLockAcquisitionTimeoutMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis();
@Override
public StoreType getStoreType() {
return StoreType.DATABASE;
@ -77,6 +85,14 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
return pageStoreTableName;
}
public void setNodeManagerStoreTableName(String nodeManagerStoreTableName) {
this.nodeManagerStoreTableName = nodeManagerStoreTableName;
}
public String getNodeManagerStoreTableName() {
return nodeManagerStoreTableName;
}
public void setPageStoreTableName(String pageStoreTableName) {
this.pageStoreTableName = pageStoreTableName;
}
@ -145,4 +161,28 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
public void setJdbcNetworkTimeout(int jdbcNetworkTimeout) {
this.jdbcNetworkTimeout = jdbcNetworkTimeout;
}
public long getJdbcLockRenewPeriodMillis() {
return jdbcLockRenewPeriodMillis;
}
public void setJdbcLockRenewPeriodMillis(long jdbcLockRenewPeriodMillis) {
this.jdbcLockRenewPeriodMillis = jdbcLockRenewPeriodMillis;
}
public long getJdbcLockExpirationMillis() {
return jdbcLockExpirationMillis;
}
public void setJdbcLockExpirationMillis(long jdbcLockExpirationMillis) {
this.jdbcLockExpirationMillis = jdbcLockExpirationMillis;
}
public long getJdbcLockAcquisitionTimeoutMillis() {
return jdbcLockAcquisitionTimeoutMillis;
}
public void setJdbcLockAcquisitionTimeoutMillis(long jdbcLockAcquisitionTimeoutMillis) {
this.jdbcLockAcquisitionTimeoutMillis = jdbcLockAcquisitionTimeoutMillis;
}
}

View File

@ -1164,6 +1164,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
conf.setJdbcConnectionUrl(getString(storeNode, "jdbc-connection-url", conf.getJdbcConnectionUrl(), Validators.NO_CHECK));
conf.setJdbcDriverClassName(getString(storeNode, "jdbc-driver-class-name", conf.getJdbcDriverClassName(), Validators.NO_CHECK));
conf.setJdbcNetworkTimeout(getInteger(storeNode, "jdbc-network-timeout", conf.getJdbcNetworkTimeout(), Validators.NO_CHECK));
conf.setJdbcLockAcquisitionTimeoutMillis(getLong(storeNode, "jdbc-lock-acquisition-timeout", conf.getJdbcLockAcquisitionTimeoutMillis(), Validators.NO_CHECK));
conf.setJdbcLockRenewPeriodMillis(getLong(storeNode, "jdbc-lock-renew-period", conf.getJdbcLockRenewPeriodMillis(), Validators.NO_CHECK));
conf.setJdbcLockExpirationMillis(getLong(storeNode, "jdbc-lock-expiration", conf.getJdbcLockExpirationMillis(), Validators.NO_CHECK));
return conf;
}

View File

@ -127,7 +127,7 @@ public abstract class NodeManager implements ActiveMQComponent {
isStarted = false;
}
public final void stopBackup() throws Exception {
public void stopBackup() throws Exception {
if (replicatedBackup && getNodeId() != null) {
setUpServerLockFile();
}

View File

@ -138,6 +138,7 @@ import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
import org.apache.activemq.artemis.core.server.group.impl.LocalGroupingHandler;
import org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler;
import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
import org.apache.activemq.artemis.core.server.reload.ReloadCallback;
@ -448,6 +449,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
NodeManager manager;
if (!configuration.isPersistenceEnabled()) {
manager = new InVMNodeManager(replicatingBackup);
} else if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
if (replicatingBackup) {
throw new IllegalArgumentException("replicatingBackup is not supported yet while using JDBC persistence");
}
final DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
manager = JdbcNodeManager.with(dbConf, scheduledPool, executorFactory, shutdownOnCriticalIO);
} else {
manager = new FileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout());
}

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.jboss.logging.Logger;
/**
* Default implementation of a {@link ScheduledLeaseLock}: see {@link ScheduledLeaseLock#of(ScheduledExecutorService, ArtemisExecutor, String, LeaseLock, long, IOCriticalErrorListener)}.
*/
final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implements ScheduledLeaseLock {
private static final Logger LOGGER = Logger.getLogger(ActiveMQScheduledLeaseLock.class);
private final String lockName;
private final LeaseLock lock;
private long lastLockRenewStart;
private final long renewPeriodMillis;
private final IOCriticalErrorListener ioCriticalErrorListener;
ActiveMQScheduledLeaseLock(ScheduledExecutorService scheduledExecutorService,
ArtemisExecutor executor,
String lockName,
LeaseLock lock,
long renewPeriodMillis,
IOCriticalErrorListener ioCriticalErrorListener) {
super(scheduledExecutorService, executor, 0, renewPeriodMillis, TimeUnit.MILLISECONDS, false);
if (renewPeriodMillis >= lock.expirationMillis()) {
throw new IllegalArgumentException("renewPeriodMillis must be < lock's expirationMillis");
}
this.lockName = lockName;
this.lock = lock;
this.renewPeriodMillis = renewPeriodMillis;
//already expired start time
this.lastLockRenewStart = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(lock.expirationMillis());
this.ioCriticalErrorListener = ioCriticalErrorListener;
}
@Override
public long renewPeriodMillis() {
return renewPeriodMillis;
}
@Override
public LeaseLock lock() {
return lock;
}
@Override
public synchronized void start() {
if (isStarted()) {
return;
}
this.lastLockRenewStart = System.nanoTime();
super.start();
}
@Override
public synchronized void stop() {
if (!isStarted()) {
return;
}
super.stop();
}
@Override
public void run() {
final long lastRenewStart = this.lastLockRenewStart;
final long renewStart = System.nanoTime();
if (!this.lock.renew()) {
ioCriticalErrorListener.onIOException(new IllegalStateException(lockName + " lock can't be renewed"), "Critical error while on " + lockName + " renew", null);
}
//logic to detect slowness of DB and/or the scheduled executor service
detectAndReportRenewSlowness(lockName, lastRenewStart, renewStart, renewPeriodMillis, lock.expirationMillis());
this.lastLockRenewStart = lastRenewStart;
}
private static void detectAndReportRenewSlowness(String lockName,
long lastRenewStart,
long renewStart,
long expectedRenewPeriodMillis,
long expirationMillis) {
final long elapsedMillisToRenew = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - renewStart);
if (elapsedMillisToRenew > expectedRenewPeriodMillis) {
LOGGER.error(lockName + " lock renew tooks " + elapsedMillisToRenew + " ms, while is supposed to take <" + expectedRenewPeriodMillis + " ms");
}
final long measuredRenewPeriodNanos = renewStart - lastRenewStart;
final long measuredRenewPeriodMillis = TimeUnit.NANOSECONDS.toMillis(measuredRenewPeriodNanos);
if (measuredRenewPeriodMillis > expirationMillis) {
LOGGER.error(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms");
} else if (measuredRenewPeriodMillis > expectedRenewPeriodMillis) {
LOGGER.warn(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms");
}
}
}

View File

@ -0,0 +1,277 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Objects;
import java.util.function.Predicate;
import org.jboss.logging.Logger;
/**
* JDBC implementation of a {@link LeaseLock} with a {@code String} defined {@link #holderId()}.
*/
final class JdbcLeaseLock implements LeaseLock {
private static final Logger LOGGER = Logger.getLogger(JdbcLeaseLock.class);
private static final int MAX_HOLDER_ID_LENGTH = 128;
private final Connection connection;
private final long maxAllowableMillisDiffFromDBTime;
private long millisDiffFromCurrentTime;
private final String holderId;
private final PreparedStatement tryAcquireLock;
private final PreparedStatement tryReleaseLock;
private final PreparedStatement renewLock;
private final PreparedStatement isLocked;
private final PreparedStatement currentDateTime;
private final long expirationMillis;
private boolean maybeAcquired;
/**
* The lock will be responsible (ie {@link #close()}) of all the {@link PreparedStatement}s used by it, but not of the {@link Connection},
* whose life cycle will be managed externally.
*/
JdbcLeaseLock(String holderId,
Connection connection,
PreparedStatement tryAcquireLock,
PreparedStatement tryReleaseLock,
PreparedStatement renewLock,
PreparedStatement isLocked,
PreparedStatement currentDateTime,
long expirationMIllis,
long maxAllowableMillisDiffFromDBTime) {
if (holderId.length() > MAX_HOLDER_ID_LENGTH) {
throw new IllegalArgumentException("holderId length must be <=" + MAX_HOLDER_ID_LENGTH);
}
this.holderId = holderId;
this.maxAllowableMillisDiffFromDBTime = maxAllowableMillisDiffFromDBTime;
this.millisDiffFromCurrentTime = Long.MAX_VALUE;
this.tryAcquireLock = tryAcquireLock;
this.tryReleaseLock = tryReleaseLock;
this.renewLock = renewLock;
this.isLocked = isLocked;
this.currentDateTime = currentDateTime;
this.expirationMillis = expirationMIllis;
this.maybeAcquired = false;
this.connection = connection;
}
public String holderId() {
return holderId;
}
@Override
public long expirationMillis() {
return expirationMillis;
}
private long timeDifference() throws SQLException {
if (Long.MAX_VALUE == millisDiffFromCurrentTime) {
if (maxAllowableMillisDiffFromDBTime > 0) {
millisDiffFromCurrentTime = determineTimeDifference();
} else {
millisDiffFromCurrentTime = 0L;
}
}
return millisDiffFromCurrentTime;
}
private long determineTimeDifference() throws SQLException {
try (ResultSet resultSet = currentDateTime.executeQuery()) {
long result = 0L;
if (resultSet.next()) {
final Timestamp timestamp = resultSet.getTimestamp(1);
final long diff = System.currentTimeMillis() - timestamp.getTime();
if (Math.abs(diff) > maxAllowableMillisDiffFromDBTime) {
// off by more than maxAllowableMillisDiffFromDBTime so lets adjust
result = (-diff);
}
LOGGER.info(holderId() + " diff adjust from db: " + result + ", db time: " + timestamp);
}
return result;
}
}
@Override
public boolean renew() {
synchronized (connection) {
try {
final boolean result;
connection.setAutoCommit(false);
try {
final long timeDifference = timeDifference();
final PreparedStatement preparedStatement = this.renewLock;
final long now = System.currentTimeMillis() + timeDifference;
final Timestamp timestamp = new Timestamp(now + expirationMillis);
preparedStatement.setTimestamp(1, timestamp);
preparedStatement.setString(2, holderId);
result = preparedStatement.executeUpdate() == 1;
} catch (SQLException ie) {
connection.rollback();
connection.setAutoCommit(true);
throw new IllegalStateException(ie);
}
connection.commit();
connection.setAutoCommit(true);
return result;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
}
@Override
public boolean tryAcquire() {
synchronized (connection) {
try {
final boolean acquired;
connection.setAutoCommit(false);
try {
final long timeDifference = timeDifference();
final PreparedStatement preparedStatement = tryAcquireLock;
final long now = System.currentTimeMillis() + timeDifference;
preparedStatement.setString(1, holderId);
final Timestamp timestamp = new Timestamp(now + expirationMillis);
preparedStatement.setTimestamp(2, timestamp);
acquired = preparedStatement.executeUpdate() == 1;
} catch (SQLException ie) {
connection.rollback();
connection.setAutoCommit(true);
throw new IllegalStateException(ie);
}
connection.commit();
connection.setAutoCommit(true);
if (acquired) {
this.maybeAcquired = true;
}
return acquired;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
}
@Override
public boolean isHeld() {
return checkValidHolderId(Objects::nonNull);
}
@Override
public boolean isHeldByCaller() {
return checkValidHolderId(this.holderId::equals);
}
private boolean checkValidHolderId(Predicate<? super String> holderIdFilter) {
synchronized (connection) {
try {
boolean result;
connection.setAutoCommit(false);
try {
final long timeDifference = timeDifference();
final PreparedStatement preparedStatement = this.isLocked;
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
result = false;
} else {
final String currentHolderId = resultSet.getString(1);
result = holderIdFilter.test(currentHolderId);
//warn about any zombie lock
final Timestamp timestamp = resultSet.getTimestamp(2);
if (timestamp != null) {
final long lockExpirationTime = timestamp.getTime();
final long now = System.currentTimeMillis() + timeDifference;
final long expiredBy = now - lockExpirationTime;
if (expiredBy > 0) {
result = false;
LOGGER.warn("found zombie lock with holderId: " + currentHolderId + " expired by: " + expiredBy + " ms");
}
}
}
}
} catch (SQLException ie) {
connection.rollback();
connection.setAutoCommit(true);
throw new IllegalStateException(ie);
}
connection.commit();
connection.setAutoCommit(true);
return result;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
}
@Override
public void release() {
synchronized (connection) {
try {
connection.setAutoCommit(false);
try {
final PreparedStatement preparedStatement = this.tryReleaseLock;
preparedStatement.setString(1, holderId);
if (preparedStatement.executeUpdate() != 1) {
LOGGER.warn(holderId + " has failed to release a lock");
} else {
LOGGER.info(holderId + " has released a lock");
}
//consider it as released to avoid on finalize to be reclaimed
this.maybeAcquired = false;
} catch (SQLException ie) {
connection.rollback();
connection.setAutoCommit(true);
throw new IllegalStateException(ie);
}
connection.commit();
connection.setAutoCommit(true);
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
}
@Override
public void close() throws SQLException {
synchronized (connection) {
//to avoid being called if not needed
if (!this.tryReleaseLock.isClosed()) {
try {
if (this.maybeAcquired) {
release();
}
} finally {
this.tryReleaseLock.close();
this.tryAcquireLock.close();
this.renewLock.close();
this.isLocked.close();
this.currentDateTime.close();
}
}
}
}
@Override
protected void finalize() throws Throwable {
close();
}
}

View File

@ -0,0 +1,380 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl.jdbc;
import javax.sql.DataSource;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.jboss.logging.Logger;
/**
* JDBC implementation of {@link NodeManager}.
*/
public final class JdbcNodeManager extends NodeManager {
private static final Logger logger = Logger.getLogger(JdbcNodeManager.class);
private static final long MAX_PAUSE_MILLIS = 2000L;
private final SharedStateManager sharedStateManager;
private final ScheduledLeaseLock scheduledLiveLock;
private final ScheduledLeaseLock scheduledBackupLock;
private final long lockRenewPeriodMillis;
private final long lockAcquisitionTimeoutMillis;
private volatile boolean interrupted = false;
private final LeaseLock.Pauser pauser;
private final IOCriticalErrorListener ioCriticalErrorListener;
public static JdbcNodeManager with(DatabaseStorageConfiguration configuration,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
if (configuration.getDataSource() != null) {
final String brokerId = java.util.UUID.randomUUID().toString();
return usingDataSource(brokerId, configuration.getJdbcLockExpirationMillis(), configuration.getJdbcLockRenewPeriodMillis(), configuration.getJdbcLockAcquisitionTimeoutMillis(), configuration.getDataSource(), configuration.getSqlProviderFactory().create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER), scheduledExecutorService, executorFactory, ioCriticalErrorListener);
} else {
final String brokerId = java.util.UUID.randomUUID().toString();
return usingConnectionUrl(brokerId, configuration.getJdbcLockExpirationMillis(), configuration.getJdbcLockRenewPeriodMillis(), configuration.getJdbcLockAcquisitionTimeoutMillis(), configuration.getJdbcConnectionUrl(), configuration.getJdbcDriverClassName(), configuration.getSqlProviderFactory().create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER), scheduledExecutorService, executorFactory, ioCriticalErrorListener);
}
}
static JdbcNodeManager usingDataSource(String brokerId,
long lockExpirationMillis,
long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis,
DataSource dataSource,
SQLProvider provider,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
return new JdbcNodeManager(JdbcSharedStateManager.usingDataSource(brokerId, lockExpirationMillis, dataSource, provider), false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, executorFactory, ioCriticalErrorListener);
}
public static JdbcNodeManager usingConnectionUrl(String brokerId,
long lockExpirationMillis,
long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis,
String jdbcUrl,
String driverClass,
SQLProvider provider,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
return new JdbcNodeManager(JdbcSharedStateManager.usingConnectionUrl(brokerId, lockExpirationMillis, jdbcUrl, driverClass, provider), false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, executorFactory, ioCriticalErrorListener);
}
private JdbcNodeManager(final SharedStateManager sharedStateManager,
boolean replicatedBackup,
long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
super(replicatedBackup, null);
this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis;
this.lockRenewPeriodMillis = lockRenewPeriodMillis;
this.pauser = LeaseLock.Pauser.sleep(Math.min(this.lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS);
this.sharedStateManager = sharedStateManager;
this.scheduledLiveLock = ScheduledLeaseLock.of(scheduledExecutorService, executorFactory.getExecutor(), "live", this.sharedStateManager.liveLock(), lockRenewPeriodMillis, ioCriticalErrorListener);
this.scheduledBackupLock = ScheduledLeaseLock.of(scheduledExecutorService, executorFactory.getExecutor(), "backup", this.sharedStateManager.backupLock(), lockRenewPeriodMillis, ioCriticalErrorListener);
this.ioCriticalErrorListener = ioCriticalErrorListener;
}
@Override
public synchronized void start() throws Exception {
if (isStarted()) {
return;
}
if (!replicatedBackup) {
final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
setUUID(nodeId);
}
super.start();
}
@Override
public synchronized void stop() throws Exception {
if (isStarted()) {
try {
this.scheduledLiveLock.stop();
this.scheduledBackupLock.stop();
} finally {
super.stop();
this.sharedStateManager.close();
}
}
}
@Override
protected void finalize() throws Throwable {
stop();
}
@Override
public boolean isAwaitingFailback() throws Exception {
return readSharedState() == SharedStateManager.State.FAILING_BACK;
}
@Override
public boolean isBackupLive() throws Exception {
//is anyone holding the live lock?
return this.scheduledLiveLock.lock().isHeld();
}
@Override
public void stopBackup() throws Exception {
if (replicatedBackup) {
final UUID nodeId = getUUID();
sharedStateManager.writeNodeId(nodeId);
}
releaseBackup();
}
@Override
public void interrupt() {
//need to be volatile: must be called concurrently to work as expected
interrupted = true;
}
@Override
public void releaseBackup() throws Exception {
if (this.scheduledBackupLock.lock().isHeldByCaller()) {
this.scheduledBackupLock.stop();
this.scheduledBackupLock.lock().release();
}
}
/**
* Try to acquire a lock, failing with an exception otherwise.
*/
private void lock(LeaseLock lock) throws Exception {
final LeaseLock.AcquireResult acquireResult = lock.tryAcquire(this.lockAcquisitionTimeoutMillis, this.pauser, () -> !this.interrupted);
switch (acquireResult) {
case Timeout:
throw new Exception("timed out waiting for lock");
case Exit:
this.interrupted = false;
throw new InterruptedException("LeaseLock was interrupted");
case Done:
break;
default:
throw new AssertionError(acquireResult + " not managed");
}
}
private void checkInterrupted(Supplier<String> message) throws InterruptedException {
if (this.interrupted) {
interrupted = false;
throw new InterruptedException(message.get());
}
}
private void renewLiveLockIfNeeded(final long acquiredOn) {
final long acquiredMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - acquiredOn);
if (acquiredMillis > this.scheduledLiveLock.renewPeriodMillis()) {
if (!this.scheduledLiveLock.lock().renew()) {
final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
try {
ioCriticalErrorListener.onIOException(e, "live lock can't be renewed", null);
} finally {
throw e;
}
}
}
}
/**
* Lock live node and check for a live state, taking care to renew it (if needed) or releasing it otherwise
*/
private boolean lockLiveAndCheckLiveState() throws Exception {
lock(this.scheduledLiveLock.lock());
final long acquiredOn = System.nanoTime();
boolean liveWhileLocked = false;
//check if the state is live
final SharedStateManager.State stateWhileLocked;
try {
stateWhileLocked = readSharedState();
} catch (Throwable t) {
logger.error("error while holding the live node lock and tried to read the shared state", t);
this.scheduledLiveLock.lock().release();
throw t;
}
if (stateWhileLocked == SharedStateManager.State.LIVE) {
renewLiveLockIfNeeded(acquiredOn);
liveWhileLocked = true;
} else {
if (logger.isDebugEnabled()) {
logger.debug("state is " + stateWhileLocked + " while holding the live lock");
}
//state is not live: can (try to) release the lock
this.scheduledLiveLock.lock().release();
}
return liveWhileLocked;
}
@Override
public void awaitLiveNode() throws Exception {
boolean liveWhileLocked = false;
while (!liveWhileLocked) {
//check first without holding any lock
final SharedStateManager.State state = readSharedState();
if (state == SharedStateManager.State.LIVE) {
//verify if the state is live while holding the live node lock too
liveWhileLocked = lockLiveAndCheckLiveState();
} else {
if (logger.isDebugEnabled()) {
logger.debug("awaiting live node...state: " + state);
}
}
if (!liveWhileLocked) {
checkInterrupted(() -> "awaitLiveNode got interrupted!");
pauser.idle();
}
}
//state is LIVE and live lock is acquired and valid
logger.debug("acquired live node lock");
this.scheduledLiveLock.start();
}
@Override
public void startBackup() throws Exception {
assert !replicatedBackup; // should not be called if this is a replicating backup
ActiveMQServerLogger.LOGGER.waitingToBecomeBackup();
lock(scheduledBackupLock.lock());
scheduledBackupLock.start();
ActiveMQServerLogger.LOGGER.gotBackupLock();
if (getUUID() == null)
readNodeId();
}
@Override
public ActivateCallback startLiveNode() throws Exception {
setFailingBack();
final String timeoutMessage = lockAcquisitionTimeoutMillis == -1 ? "indefinitely" : lockAcquisitionTimeoutMillis + " milliseconds";
ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage);
lock(this.scheduledLiveLock.lock());
this.scheduledLiveLock.start();
ActiveMQServerLogger.LOGGER.obtainedLiveLock();
return new ActivateCallback() {
@Override
public void preActivate() {
}
@Override
public void activated() {
}
@Override
public void deActivate() {
}
@Override
public void activationComplete() {
try {
//state can be written only if the live renew task is running
setLive();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
}
}
};
}
@Override
public void pauseLiveServer() throws Exception {
if (scheduledLiveLock.isStarted()) {
setPaused();
scheduledLiveLock.stop();
scheduledLiveLock.lock().release();
} else if (scheduledLiveLock.lock().renew()) {
setPaused();
scheduledLiveLock.lock().release();
} else {
final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
try {
ioCriticalErrorListener.onIOException(e, "live lock can't be renewed on pauseLiveServer", null);
} finally {
throw e;
}
}
}
@Override
public void crashLiveServer() throws Exception {
if (this.scheduledLiveLock.lock().isHeldByCaller()) {
scheduledLiveLock.stop();
this.scheduledLiveLock.lock().release();
}
}
@Override
public void awaitLiveStatus() {
while (readSharedState() != SharedStateManager.State.LIVE) {
pauser.idle();
}
}
private void setLive() {
writeSharedState(SharedStateManager.State.LIVE);
}
private void setFailingBack() {
writeSharedState(SharedStateManager.State.FAILING_BACK);
}
private void setPaused() {
writeSharedState(SharedStateManager.State.PAUSED);
}
private void writeSharedState(SharedStateManager.State state) {
assert !this.replicatedBackup : "the replicated backup can't write the shared state!";
this.sharedStateManager.writeState(state);
}
private SharedStateManager.State readSharedState() {
return this.sharedStateManager.readState();
}
@Override
public SimpleString readNodeId() {
final UUID nodeId = this.sharedStateManager.readNodeId();
setUUID(nodeId);
return getNodeId();
}
}

View File

@ -0,0 +1,302 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl.jdbc;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.function.Supplier;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.UUID;
/**
* JDBC implementation of a {@link SharedStateManager}.
*/
@SuppressWarnings("SynchronizeOnNonFinalField")
final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedStateManager {
private final String holderId;
private final long lockExpirationMillis;
private JdbcLeaseLock liveLock;
private JdbcLeaseLock backupLock;
private PreparedStatement readNodeId;
private PreparedStatement writeNodeId;
private PreparedStatement readState;
private PreparedStatement writeState;
public static JdbcSharedStateManager usingDataSource(String holderId,
long locksExpirationMillis,
DataSource dataSource,
SQLProvider provider) {
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
sharedStateManager.setDataSource(dataSource);
sharedStateManager.setSqlProvider(provider);
try {
sharedStateManager.start();
return sharedStateManager;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
public static JdbcSharedStateManager usingConnectionUrl(String holderId,
long locksExpirationMillis,
String jdbcConnectionUrl,
String jdbcDriverClass,
SQLProvider provider) {
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
sharedStateManager.setJdbcConnectionUrl(jdbcConnectionUrl);
sharedStateManager.setJdbcDriverClass(jdbcDriverClass);
sharedStateManager.setSqlProvider(provider);
try {
sharedStateManager.start();
return sharedStateManager;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
@Override
protected void createSchema() throws SQLException {
try {
createTable(sqlProvider.createNodeManagerStoreTableSQL(), sqlProvider.createNodeIdSQL(), sqlProvider.createStateSQL(), sqlProvider.createLiveLockSQL(), sqlProvider.createBackupLockSQL());
} catch (SQLException e) {
//no op: if a table already exists is not a problem in this case, the prepareStatements() call will fail right after it if the table is not correctly initialized
}
}
static JdbcLeaseLock createLiveLock(String holderId,
Connection connection,
SQLProvider sqlProvider,
long expirationMillis,
long maxAllowableMillisDiffFromDBtime) throws SQLException {
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, maxAllowableMillisDiffFromDBtime);
}
static JdbcLeaseLock createBackupLock(String holderId,
Connection connection,
SQLProvider sqlProvider,
long expirationMillis,
long maxAllowableMillisDiffFromDBtime) throws SQLException {
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, maxAllowableMillisDiffFromDBtime);
}
@Override
protected void prepareStatements() throws SQLException {
this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, 0);
this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, 0);
this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL());
this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL());
this.writeState = connection.prepareStatement(sqlProvider.writeStateSQL());
this.readState = connection.prepareStatement(sqlProvider.readStateSQL());
}
private JdbcSharedStateManager(String holderId, long lockExpirationMillis) {
this.holderId = holderId;
this.lockExpirationMillis = lockExpirationMillis;
}
@Override
public LeaseLock liveLock() {
return this.liveLock;
}
@Override
public LeaseLock backupLock() {
return this.backupLock;
}
private UUID rawReadNodeId() throws SQLException {
final PreparedStatement preparedStatement = this.readNodeId;
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
return null;
} else {
final String nodeId = resultSet.getString(1);
if (nodeId != null) {
return new UUID(UUID.TYPE_TIME_BASED, UUID.stringToBytes(nodeId));
} else {
return null;
}
}
}
}
@Override
public UUID readNodeId() {
synchronized (connection) {
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
connection.setAutoCommit(true);
final UUID nodeId = rawReadNodeId();
return nodeId;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
}
@Override
public void writeNodeId(UUID nodeId) {
synchronized (connection) {
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
connection.setAutoCommit(true);
rawWriteNodeId(nodeId);
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
}
private void rawWriteNodeId(UUID nodeId) throws SQLException {
final PreparedStatement preparedStatement = this.writeNodeId;
preparedStatement.setString(1, nodeId.toString());
if (preparedStatement.executeUpdate() != 1) {
throw new IllegalStateException("can't write NODE_ID on the JDBC Node Manager Store!");
}
}
@Override
public UUID setup(Supplier<? extends UUID> nodeIdFactory) {
//uses a single transaction to make everything
synchronized (connection) {
try {
final UUID nodeId;
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
connection.setAutoCommit(false);
try {
UUID readNodeId = rawReadNodeId();
if (readNodeId == null) {
nodeId = nodeIdFactory.get();
rawWriteNodeId(nodeId);
} else {
nodeId = readNodeId;
}
} catch (SQLException e) {
connection.rollback();
connection.setAutoCommit(true);
throw e;
}
connection.commit();
connection.setAutoCommit(true);
return nodeId;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
}
private static State decodeState(String s) {
if (s == null) {
return State.NOT_STARTED;
}
switch (s) {
case "L":
return State.LIVE;
case "F":
return State.FAILING_BACK;
case "P":
return State.PAUSED;
case "N":
return State.NOT_STARTED;
default:
throw new IllegalStateException("unknown state [" + s + "]");
}
}
private static String encodeState(State state) {
switch (state) {
case LIVE:
return "L";
case FAILING_BACK:
return "F";
case PAUSED:
return "P";
case NOT_STARTED:
return "N";
default:
throw new IllegalStateException("unknown state [" + state + "]");
}
}
@Override
public State readState() {
synchronized (connection) {
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
connection.setAutoCommit(true);
final State state;
final PreparedStatement preparedStatement = this.readState;
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
state = State.FIRST_TIME_START;
} else {
state = decodeState(resultSet.getString(1));
}
}
return state;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
}
@Override
public void writeState(State state) {
final String encodedState = encodeState(state);
synchronized (connection) {
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
connection.setAutoCommit(true);
final PreparedStatement preparedStatement = this.writeState;
preparedStatement.setString(1, encodedState);
if (preparedStatement.executeUpdate() != 1) {
throw new IllegalStateException("can't write STATE to the JDBC Node Manager Store!");
}
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
}
@Override
public void stop() throws SQLException {
//release all the managed resources inside the connection lock
if (sqlProvider.closeConnectionOnShutdown()) {
synchronized (connection) {
this.readNodeId.close();
this.writeNodeId.close();
this.readState.close();
this.writeState.close();
this.liveLock.close();
this.backupLock.close();
super.stop();
}
}
}
@Override
public void close() throws SQLException {
stop();
}
}

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
/**
* It represents a lock that can't be held more than {@link #expirationMillis()} without being renewed.
*
* <p>
* An implementor must provide implicitly the caller identity to contextualize each operation (eg {@link JdbcLeaseLock}
* uses one caller per instance)
*/
interface LeaseLock extends AutoCloseable {
enum AcquireResult {
Timeout, Exit, Done
}
interface ExitCondition {
/**
* @return true as long as we should keep running
*/
boolean keepRunning();
}
interface Pauser {
void idle();
static Pauser sleep(long idleTime, TimeUnit timeUnit) {
final long idleNanos = timeUnit.toNanos(idleTime);
//can fail spuriously but doesn't throw any InterruptedException
return () -> LockSupport.parkNanos(idleNanos);
}
static Pauser noWait() {
return () -> {
};
}
}
/**
* The expiration in milliseconds from the last valid acquisition/renew.
*/
default long expirationMillis() {
return Long.MAX_VALUE;
}
/**
* It extends the lock expiration (if held) to {@link System#currentTimeMillis()} + {@link #expirationMillis()}.
*
* @return {@code true} if the expiration has been moved on, {@code false} otherwise
*/
default boolean renew() {
return true;
}
/**
* Not reentrant lock acquisition operation.
* The lock can be acquired if is not held by anyone (including the caller) or has an expired ownership.
*
* @return {@code true} if has been acquired, {@code false} otherwise
*/
boolean tryAcquire();
/**
* Not reentrant lock acquisition operation (ie {@link #tryAcquire()}).
* It tries to acquire the lock until will succeed (ie {@link AcquireResult#Done})or got interrupted (ie {@link AcquireResult#Exit}).
* After each failed attempt is performed a {@link Pauser#idle} call.
*/
default AcquireResult tryAcquire(ExitCondition exitCondition, Pauser pauser) {
while (exitCondition.keepRunning()) {
if (tryAcquire()) {
return AcquireResult.Done;
} else {
pauser.idle();
}
}
return AcquireResult.Exit;
}
/**
* Not reentrant lock acquisition operation (ie {@link #tryAcquire()}).
* It tries to acquire the lock until will succeed (ie {@link AcquireResult#Done}), got interrupted (ie {@link AcquireResult#Exit})
* or exceed {@code tryAcquireTimeoutMillis}.
* After each failed attempt is performed a {@link Pauser#idle} call.
* If the specified timeout is <=0 then it behaves as {@link #tryAcquire(ExitCondition, Pauser)}.
*/
default AcquireResult tryAcquire(long tryAcquireTimeoutMillis, Pauser pauser, ExitCondition exitCondition) {
if (tryAcquireTimeoutMillis < 0) {
return tryAcquire(exitCondition, pauser);
}
final long timeoutInNanosecond = TimeUnit.MILLISECONDS.toNanos(tryAcquireTimeoutMillis);
final long startAcquire = System.nanoTime();
while (exitCondition.keepRunning()) {
if (tryAcquire()) {
return AcquireResult.Done;
} else if (System.nanoTime() - startAcquire >= timeoutInNanosecond) {
return AcquireResult.Timeout;
} else {
pauser.idle();
//check before doing anything if time is expired
if (System.nanoTime() - startAcquire >= timeoutInNanosecond) {
return AcquireResult.Timeout;
}
}
}
return AcquireResult.Exit;
}
/**
* @return {@code true} if there is a valid (ie not expired) owner, {@code false} otherwise
*/
boolean isHeld();
/**
* @return {@code true} if the caller is a valid (ie not expired) owner, {@code false} otherwise
*/
boolean isHeldByCaller();
/**
* It release the lock itself and all the resources used by it (eg connections, file handlers)
*/
@Override
default void close() throws Exception {
release();
}
/**
* Perform the release if this lock is held by the caller.
*/
void release();
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
/**
* {@link LeaseLock} holder that allows to schedule a {@link LeaseLock#renew} task with a fixed {@link #renewPeriodMillis()} delay.
*/
interface ScheduledLeaseLock extends ActiveMQComponent {
LeaseLock lock();
long renewPeriodMillis();
static ScheduledLeaseLock of(ScheduledExecutorService scheduledExecutorService,
ArtemisExecutor executor,
String lockName,
LeaseLock lock,
long renewPeriodMillis,
IOCriticalErrorListener ioCriticalErrorListener) {
return new ActiveMQScheduledLeaseLock(scheduledExecutorService, executor, lockName, lock, renewPeriodMillis, ioCriticalErrorListener);
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.util.function.Supplier;
import org.apache.activemq.artemis.utils.UUID;
/**
* Facade to abstract the operations on the shared state (inter-process and/or inter-thread) necessary to coordinate broker nodes.
*/
interface SharedStateManager extends AutoCloseable {
enum State {
LIVE, PAUSED, FAILING_BACK, NOT_STARTED, FIRST_TIME_START
}
LeaseLock liveLock();
LeaseLock backupLock();
UUID readNodeId();
void writeNodeId(UUID nodeId);
/**
* Purpose of this method is to setup the environment to provide a shared state between live/backup servers.
* That means:
* - check if a shared state exist and create it/wait for it if not
* - check if a nodeId exists and create it if not
*
* @param nodeIdFactory used to create the nodeId if needed
* @return the newly created NodeId or the old one if already present
*/
UUID setup(Supplier<? extends UUID> nodeIdFactory);
State readState();
void writeState(State state);
@Override
default void close() throws Exception {
}
}

View File

@ -1722,6 +1722,27 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="jdbc-lock-acquisition-timeout" type="xsd:int" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The max allowed time in milliseconds while trying to acquire a JDBC lock.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="jdbc-lock-renew-period" type="xsd:int" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The period in milliseconds of the keep alive service of a JDBC lock.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="jdbc-lock-expiration" type="xsd:int" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The time in milliseconds a JDBC lock is considered valid without keeping it alive.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="jms-bindings-table-name" type="xsd:string" minOccurs="1" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>

View File

@ -0,0 +1,231 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.sql.SQLException;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class JdbcLeaseLockTest {
private static final long DEFAULT_LOCK_EXPIRATION_MILLIS = TimeUnit.SECONDS.toMillis(10);
private static final SQLProvider SQL_PROVIDER = new DerbySQLProvider.Factory().create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true";
private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
private JdbcSharedStateManager jdbcSharedStateManager;
private LeaseLock lock() {
return lock(DEFAULT_LOCK_EXPIRATION_MILLIS);
}
private LeaseLock lock(long acquireMillis) {
try {
return JdbcSharedStateManager.createLiveLock(UUID.randomUUID().toString(), jdbcSharedStateManager.getConnection(), SQL_PROVIDER, acquireMillis, 0);
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
@Before
public void createLockTable() {
jdbcSharedStateManager = JdbcSharedStateManager.usingConnectionUrl(UUID.randomUUID().toString(), DEFAULT_LOCK_EXPIRATION_MILLIS, JDBC_URL, DRIVER_CLASS_NAME, SQL_PROVIDER);
}
@After
public void dropLockTable() throws Exception {
jdbcSharedStateManager.destroy();
jdbcSharedStateManager.close();
}
@Test
public void shouldAcquireLock() {
final LeaseLock lock = lock();
final boolean acquired = lock.tryAcquire();
Assert.assertTrue("Must acquire the lock!", acquired);
try {
Assert.assertTrue("The lock is been held by the caller!", lock.isHeldByCaller());
} finally {
lock.release();
}
}
@Test
public void shouldNotAcquireLockWhenAlreadyHeldByOthers() {
final LeaseLock lock = lock();
Assert.assertTrue("Must acquire the lock", lock.tryAcquire());
try {
Assert.assertTrue("Lock held by the caller", lock.isHeldByCaller());
final LeaseLock failingLock = lock();
Assert.assertFalse("lock already held by other", failingLock.tryAcquire());
Assert.assertFalse("lock already held by other", failingLock.isHeldByCaller());
Assert.assertTrue("lock already held by other", failingLock.isHeld());
} finally {
lock.release();
}
}
@Test
public void shouldNotAcquireLockTwice() {
final LeaseLock lock = lock();
Assert.assertTrue("Must acquire the lock", lock.tryAcquire());
try {
Assert.assertFalse("lock already acquired", lock.tryAcquire());
} finally {
lock.release();
}
}
@Test
public void shouldNotCorruptGuardedState() throws InterruptedException {
final AtomicLong sharedState = new AtomicLong(0);
final int producers = 2;
final int writesPerProducer = 10;
final long idleMillis = 1000;
final long millisToAcquireLock = writesPerProducer * (producers - 1) * idleMillis;
final LeaseLock.Pauser pauser = LeaseLock.Pauser.sleep(idleMillis, TimeUnit.MILLISECONDS);
final CountDownLatch finished = new CountDownLatch(producers);
final LeaseLock[] locks = new LeaseLock[producers];
final AtomicInteger lockIndex = new AtomicInteger(0);
final Runnable producerTask = () -> {
final LeaseLock lock = locks[lockIndex.getAndIncrement()];
try {
for (int i = 0; i < writesPerProducer; i++) {
final LeaseLock.AcquireResult acquireResult = lock.tryAcquire(millisToAcquireLock, pauser, () -> true);
if (acquireResult != LeaseLock.AcquireResult.Done) {
throw new IllegalStateException(acquireResult + " from " + Thread.currentThread());
}
//avoid the atomic getAndIncrement operation on purpose
sharedState.lazySet(sharedState.get() + 1);
lock.release();
}
} finally {
finished.countDown();
}
};
final Thread[] producerThreads = new Thread[producers];
for (int i = 0; i < producers; i++) {
locks[i] = lock();
producerThreads[i] = new Thread(producerTask);
}
Stream.of(producerThreads).forEach(Thread::start);
final long maxTestTime = millisToAcquireLock * writesPerProducer * producers;
Assert.assertTrue("Each producers must complete the writes", finished.await(maxTestTime, TimeUnit.MILLISECONDS));
Assert.assertEquals("locks hasn't mutual excluded producers", writesPerProducer * producers, sharedState.get());
}
@Test
public void shouldAcquireExpiredLock() throws InterruptedException {
final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try {
Thread.sleep(lock.expirationMillis() * 2);
Assert.assertFalse("lock is already expired", lock.isHeldByCaller());
Assert.assertFalse("lock is already expired", lock.isHeld());
Assert.assertTrue("lock is already expired", lock.tryAcquire());
} finally {
lock.release();
}
}
@Test
public void shouldOtherAcquireExpiredLock() throws InterruptedException {
final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try {
Thread.sleep(lock.expirationMillis() * 2);
Assert.assertFalse("lock is already expired", lock.isHeldByCaller());
Assert.assertFalse("lock is already expired", lock.isHeld());
final LeaseLock otherLock = lock(TimeUnit.SECONDS.toMillis(10));
try {
Assert.assertTrue("lock is already expired", otherLock.tryAcquire());
} finally {
otherLock.release();
}
} finally {
lock.release();
}
}
@Test
public void shouldRenewAcquiredLock() throws InterruptedException {
final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(10));
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try {
Assert.assertTrue("lock is owned", lock.renew());
} finally {
lock.release();
}
}
@Test
public void shouldNotRenewReleasedLock() throws InterruptedException {
final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(10));
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
lock.release();
Assert.assertFalse("lock is already released", lock.isHeldByCaller());
Assert.assertFalse("lock is already released", lock.isHeld());
Assert.assertFalse("lock is already released", lock.renew());
}
@Test
public void shouldRenewExpiredLockNotAcquiredByOthers() throws InterruptedException {
final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try {
Thread.sleep(lock.expirationMillis() * 2);
Assert.assertFalse("lock is already expired", lock.isHeldByCaller());
Assert.assertFalse("lock is already expired", lock.isHeld());
Assert.assertTrue("lock is owned", lock.renew());
} finally {
lock.release();
}
}
@Test
public void shouldNotRenewLockAcquiredByOthers() throws InterruptedException {
final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try {
Thread.sleep(lock.expirationMillis() * 2);
Assert.assertFalse("lock is already expired", lock.isHeldByCaller());
Assert.assertFalse("lock is already expired", lock.isHeld());
final LeaseLock otherLock = lock(TimeUnit.SECONDS.toMillis(10));
Assert.assertTrue("lock is already expired", otherLock.tryAcquire());
try {
Assert.assertFalse("lock is owned by others", lock.renew());
} finally {
otherLock.release();
}
} finally {
lock.release();
}
}
}

View File

@ -436,6 +436,21 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a
The JDBC network connection timeout in milliseconds. The default value
is 20000 milliseconds (ie 20 seconds).
- `jdbc-lock-acquisition-timeout`
The max allowed time in milliseconds while trying to acquire a JDBC lock. The default value
is 60000 milliseconds (ie 60 seconds).
- `jdbc-lock-renew-period`
The period in milliseconds of the keep alive service of a JDBC lock. The default value
is 2000 milliseconds (ie 2 seconds).
- `jdbc-lock-expiration`
The time in milliseconds a JDBC lock is considered valid without keeping it alive. The default value
is 20000 milliseconds (ie 20 seconds).
## Configuring Apache ActiveMQ Artemis for Zero Persistence