ARTEMIS-1447 JDBC NodeManager to support JDBC HA Shared Store

This commit is contained in:
Francesco Nigro 2017-09-09 18:41:30 +02:00 committed by Clebert Suconic
parent 909222181a
commit 09a5d6f1c6
18 changed files with 1896 additions and 17 deletions

View File

@ -19,9 +19,9 @@ package org.apache.activemq.artemis.api.config;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
/**
@ -438,8 +438,17 @@ public final class ActiveMQDefaultConfiguration {
// Default large messages table name, used with Database storage type
private static final String DEFAULT_PAGE_STORE_TABLE_NAME = "PAGE_STORE";
// Default node manager store table name, used with Database storage type
private static final String DEFAULT_NODE_MANAGER_STORE_TABLE_NAME = "NODE_MANAGER_STORE";
private static final int DEFAULT_JDBC_NETWORK_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20);
private static final long DEFAULT_JDBC_LOCK_RENEW_PERIOD_MILLIS = TimeUnit.SECONDS.toMillis(4);
private static final long DEFAULT_JDBC_LOCK_EXPIRATION_MILLIS = TimeUnit.SECONDS.toMillis(20);
private static final long DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60);
// Default period to wait between connection TTL checks
public static final long DEFAULT_CONNECTION_TTL_CHECK_INTERVAL = 2000;
@ -1211,10 +1220,26 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_PAGE_STORE_TABLE_NAME;
}
public static String getDefaultNodeManagerStoreTableName() {
return DEFAULT_NODE_MANAGER_STORE_TABLE_NAME;
}
public static int getDefaultJdbcNetworkTimeout() {
return DEFAULT_JDBC_NETWORK_TIMEOUT;
}
public static long getDefaultJdbcLockRenewPeriodMillis() {
return DEFAULT_JDBC_LOCK_RENEW_PERIOD_MILLIS;
}
public static long getDefaultJdbcLockExpirationMillis() {
return DEFAULT_JDBC_LOCK_EXPIRATION_MILLIS;
}
public static long getDefaultJdbcLockAcquisitionTimeoutMillis() {
return DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS;
}
public static long getDefaultConnectionTtlCheckInterval() {
return DEFAULT_CONNECTION_TTL_CHECK_INTERVAL;
}

View File

@ -18,6 +18,14 @@ package org.apache.activemq.artemis.jdbc.store.sql;
public class GenericSQLProvider implements SQLProvider {
/**
* The JDBC Node Manager shared state is contained in these 4 rows: each one is used exclusively for a specific purpose.
*/
private static final int STATE_ROW_ID = 0;
private static final int LIVE_LOCK_ROW_ID = 1;
private static final int BACKUP_LOCK_ROW_ID = 2;
private static final int NODE_ID_ROW_ID = 3;
// Default to lowest (MYSQL = 64k)
private static final long MAX_BLOB_SIZE = 64512;
@ -57,6 +65,42 @@ public class GenericSQLProvider implements SQLProvider {
private final String countJournalRecordsSQL;
private final String createNodeManagerStoreTableSQL;
private final String createStateSQL;
private final String createNodeIdSQL;
private final String createLiveLockSQL;
private final String createBackupLockSQL;
private final String tryAcquireLiveLockSQL;
private final String tryAcquireBackupLockSQL;
private final String tryReleaseLiveLockSQL;
private final String tryReleaseBackupLockSQL;
private final String isLiveLockedSQL;
private final String isBackupLockedSQL;
private final String renewLiveLockSQL;
private final String renewBackupLockSQL;
private final String currentTimestampSQL;
private final String writeStateSQL;
private final String readStateSQL;
private final String writeNodeIdSQL;
private final String readNodeIdSQL;
protected final DatabaseStoreType databaseStoreType;
protected GenericSQLProvider(String tableName, DatabaseStoreType databaseStoreType) {
@ -64,8 +108,7 @@ public class GenericSQLProvider implements SQLProvider {
this.databaseStoreType = databaseStoreType;
createFileTableSQL = "CREATE TABLE " + tableName +
"(ID BIGINT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
createFileTableSQL = "CREATE TABLE " + tableName + "(ID BIGINT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
insertFileSQL = "INSERT INTO " + tableName + " (FILENAME, EXTENSION, DATA) VALUES (?,?,?)";
@ -81,17 +124,13 @@ public class GenericSQLProvider implements SQLProvider {
updateFileNameByIdSQL = "UPDATE " + tableName + " SET FILENAME=? WHERE ID=?";
cloneFileRecordSQL = "INSERT INTO " + tableName + "(FILENAME, EXTENSION, DATA) " +
"(SELECT FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE ID=?)";
cloneFileRecordSQL = "INSERT INTO " + tableName + "(FILENAME, EXTENSION, DATA) " + "(SELECT FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE ID=?)";
copyFileRecordByIdSQL = "UPDATE " + tableName + " SET DATA = (SELECT DATA FROM " + tableName + " WHERE ID=?) WHERE ID=?";
dropFileTableSQL = "DROP TABLE " + tableName;
createJournalTableSQL = new String[] {
"CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT NOT NULL, PRIMARY KEY(seq))",
"CREATE INDEX " + tableName + "_IDX ON " + tableName + " (id)"
};
createJournalTableSQL = new String[]{"CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT NOT NULL, PRIMARY KEY(seq))", "CREATE INDEX " + tableName + "_IDX ON " + tableName + " (id)"};
insertJournalRecordsSQL = "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?)";
@ -102,6 +141,43 @@ public class GenericSQLProvider implements SQLProvider {
deleteJournalTxRecordsSQL = "DELETE FROM " + tableName + " WHERE txId=?";
countJournalRecordsSQL = "SELECT COUNT(*) FROM " + tableName;
createNodeManagerStoreTableSQL = "CREATE TABLE " + tableName + " ( ID INT NOT NULL, HOLDER_ID VARCHAR(128), HOLDER_EXPIRATION_TIME TIMESTAMP, NODE_ID CHAR(36),STATE CHAR(1), PRIMARY KEY(ID))";
createStateSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + STATE_ROW_ID + ")";
createNodeIdSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + NODE_ID_ROW_ID + ")";
createLiveLockSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + LIVE_LOCK_ROW_ID + ")";
createBackupLockSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + BACKUP_LOCK_ROW_ID + ")";
tryAcquireLiveLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP) AND ID = " + LIVE_LOCK_ROW_ID;
tryAcquireBackupLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP) AND ID = " + BACKUP_LOCK_ROW_ID;
tryReleaseLiveLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = NULL, HOLDER_EXPIRATION_TIME = NULL WHERE HOLDER_ID = ? AND ID = " + LIVE_LOCK_ROW_ID;
tryReleaseBackupLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = NULL, HOLDER_EXPIRATION_TIME = NULL WHERE HOLDER_ID = ? AND ID = " + BACKUP_LOCK_ROW_ID;
isLiveLockedSQL = "SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME FROM " + tableName + " WHERE ID = " + LIVE_LOCK_ROW_ID;
isBackupLockedSQL = "SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME FROM " + tableName + " WHERE ID = " + BACKUP_LOCK_ROW_ID;
renewLiveLockSQL = "UPDATE " + tableName + " SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND ID = " + LIVE_LOCK_ROW_ID;
renewBackupLockSQL = "UPDATE " + tableName + " SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND ID = " + BACKUP_LOCK_ROW_ID;
currentTimestampSQL = "SELECT CURRENT_TIMESTAMP FROM " + tableName;
writeStateSQL = "UPDATE " + tableName + " SET STATE = ? WHERE ID = " + STATE_ROW_ID;
readStateSQL = "SELECT STATE FROM " + tableName + " WHERE ID = " + STATE_ROW_ID;
writeNodeIdSQL = "UPDATE " + tableName + " SET NODE_ID = ? WHERE ID = " + NODE_ID_ROW_ID;
readNodeIdSQL = "SELECT NODE_ID FROM " + tableName + " WHERE ID = " + NODE_ID_ROW_ID;
}
@Override
@ -201,6 +277,96 @@ public class GenericSQLProvider implements SQLProvider {
return dropFileTableSQL;
}
@Override
public String createNodeManagerStoreTableSQL() {
return createNodeManagerStoreTableSQL;
}
@Override
public String createStateSQL() {
return createStateSQL;
}
@Override
public String createNodeIdSQL() {
return createNodeIdSQL;
}
@Override
public String createLiveLockSQL() {
return createLiveLockSQL;
}
@Override
public String createBackupLockSQL() {
return createBackupLockSQL;
}
@Override
public String tryAcquireLiveLockSQL() {
return tryAcquireLiveLockSQL;
}
@Override
public String tryAcquireBackupLockSQL() {
return tryAcquireBackupLockSQL;
}
@Override
public String tryReleaseLiveLockSQL() {
return tryReleaseLiveLockSQL;
}
@Override
public String tryReleaseBackupLockSQL() {
return tryReleaseBackupLockSQL;
}
@Override
public String isLiveLockedSQL() {
return isLiveLockedSQL;
}
@Override
public String isBackupLockedSQL() {
return isBackupLockedSQL;
}
@Override
public String renewLiveLockSQL() {
return renewLiveLockSQL;
}
@Override
public String renewBackupLockSQL() {
return renewBackupLockSQL;
}
@Override
public String currentTimestampSQL() {
return currentTimestampSQL;
}
@Override
public String writeStateSQL() {
return writeStateSQL;
}
@Override
public String readStateSQL() {
return readStateSQL;
}
@Override
public String writeNodeIdSQL() {
return writeNodeIdSQL;
}
@Override
public String readNodeIdSQL() {
return readNodeIdSQL;
}
@Override
public boolean closeConnectionOnShutdown() {
return true;

View File

@ -19,7 +19,7 @@ package org.apache.activemq.artemis.jdbc.store.sql;
public interface SQLProvider {
enum DatabaseStoreType {
PAGE, MESSAGE_JOURNAL, BINDINGS_JOURNAL, LARGE_MESSAGE
PAGE, MESSAGE_JOURNAL, BINDINGS_JOURNAL, LARGE_MESSAGE, NODE_MANAGER
}
long getMaxBlobSize();
@ -62,7 +62,44 @@ public interface SQLProvider {
boolean closeConnectionOnShutdown();
String createNodeManagerStoreTableSQL();
String createStateSQL();
String createNodeIdSQL();
String createLiveLockSQL();
String createBackupLockSQL();
String tryAcquireLiveLockSQL();
String tryAcquireBackupLockSQL();
String tryReleaseLiveLockSQL();
String tryReleaseBackupLockSQL();
String isLiveLockedSQL();
String isBackupLockedSQL();
String renewLiveLockSQL();
String renewBackupLockSQL();
String currentTimestampSQL();
String writeStateSQL();
String readStateSQL();
String writeNodeIdSQL();
String readNodeIdSQL();
interface Factory {
SQLProvider create(String tableName, DatabaseStoreType dbStoreType);
}
}

View File

@ -117,7 +117,12 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
<!-- db test -->
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>

View File

@ -32,6 +32,8 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
private String pageStoreTableName = ActiveMQDefaultConfiguration.getDefaultPageStoreTableName();
private String nodeManagerStoreTableName = ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName();
private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl();
private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName();
@ -42,6 +44,12 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
private int jdbcNetworkTimeout = ActiveMQDefaultConfiguration.getDefaultJdbcNetworkTimeout();
private long jdbcLockRenewPeriodMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis();
private long jdbcLockExpirationMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis();
private long jdbcLockAcquisitionTimeoutMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis();
@Override
public StoreType getStoreType() {
return StoreType.DATABASE;
@ -75,6 +83,14 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
return pageStoreTableName;
}
public void setNodeManagerStoreTableName(String nodeManagerStoreTableName) {
this.nodeManagerStoreTableName = nodeManagerStoreTableName;
}
public String getNodeManagerStoreTableName() {
return nodeManagerStoreTableName;
}
public void setPageStoreTableName(String pageStoreTableName) {
this.pageStoreTableName = pageStoreTableName;
}
@ -135,4 +151,28 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
public void setJdbcNetworkTimeout(int jdbcNetworkTimeout) {
this.jdbcNetworkTimeout = jdbcNetworkTimeout;
}
public long getJdbcLockRenewPeriodMillis() {
return jdbcLockRenewPeriodMillis;
}
public void setJdbcLockRenewPeriodMillis(long jdbcLockRenewPeriodMillis) {
this.jdbcLockRenewPeriodMillis = jdbcLockRenewPeriodMillis;
}
public long getJdbcLockExpirationMillis() {
return jdbcLockExpirationMillis;
}
public void setJdbcLockExpirationMillis(long jdbcLockExpirationMillis) {
this.jdbcLockExpirationMillis = jdbcLockExpirationMillis;
}
public long getJdbcLockAcquisitionTimeoutMillis() {
return jdbcLockAcquisitionTimeoutMillis;
}
public void setJdbcLockAcquisitionTimeoutMillis(long jdbcLockAcquisitionTimeoutMillis) {
this.jdbcLockAcquisitionTimeoutMillis = jdbcLockAcquisitionTimeoutMillis;
}
}

View File

@ -31,8 +31,6 @@ import java.util.Set;
import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
@ -52,6 +50,7 @@ import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.config.ha.ColocatedPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
@ -84,6 +83,7 @@ import org.apache.activemq.artemis.utils.PasswordMaskingUtil;
import org.apache.activemq.artemis.utils.SensitiveDataCodec;
import org.apache.activemq.artemis.utils.XMLConfigurationUtil;
import org.apache.activemq.artemis.utils.XMLUtil;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
import org.w3c.dom.Element;
import org.w3c.dom.NamedNodeMap;
import org.w3c.dom.Node;
@ -1422,6 +1422,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
conf.setJdbcConnectionUrl(getString(storeNode, "jdbc-connection-url", conf.getJdbcConnectionUrl(), Validators.NO_CHECK));
conf.setJdbcDriverClassName(getString(storeNode, "jdbc-driver-class-name", conf.getJdbcDriverClassName(), Validators.NO_CHECK));
conf.setJdbcNetworkTimeout(getInteger(storeNode, "jdbc-network-timeout", conf.getJdbcNetworkTimeout(), Validators.NO_CHECK));
conf.setJdbcLockAcquisitionTimeoutMillis(getLong(storeNode, "jdbc-lock-acquisition-timeout", conf.getJdbcLockAcquisitionTimeoutMillis(), Validators.NO_CHECK));
conf.setJdbcLockRenewPeriodMillis(getLong(storeNode, "jdbc-lock-renew-period", conf.getJdbcLockRenewPeriodMillis(), Validators.NO_CHECK));
conf.setJdbcLockExpirationMillis(getLong(storeNode, "jdbc-lock-expiration", conf.getJdbcLockExpirationMillis(), Validators.NO_CHECK));
return conf;
}

View File

@ -127,7 +127,7 @@ public abstract class NodeManager implements ActiveMQComponent {
isStarted = false;
}
public final void stopBackup() throws Exception {
public void stopBackup() throws Exception {
if (replicatedBackup && getNodeId() != null) {
setUpServerLockFile();
}

View File

@ -49,10 +49,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -136,7 +135,6 @@ import org.apache.activemq.artemis.core.server.ServiceComponent;
import org.apache.activemq.artemis.core.server.ServiceRegistry;
import org.apache.activemq.artemis.core.server.cluster.BackupManager;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
import org.apache.activemq.artemis.core.server.files.FileMoveManager;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
@ -144,6 +142,7 @@ import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
import org.apache.activemq.artemis.core.server.group.impl.LocalGroupingHandler;
import org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler;
import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
@ -151,6 +150,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.server.reload.ReloadCallback;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
@ -176,6 +176,7 @@ import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.critical.CriticalAction;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerImpl;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
import org.apache.activemq.artemis.utils.critical.CriticalComponent;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
import org.jboss.logging.Logger;
@ -456,6 +457,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
NodeManager manager;
if (!configuration.isPersistenceEnabled()) {
manager = new InVMNodeManager(replicatingBackup);
} else if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
if (replicatingBackup) {
throw new IllegalArgumentException("replicatingBackup is not supported yet while using JDBC persistence");
}
final DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
manager = JdbcNodeManager.with(dbConf, scheduledPool, executorFactory, shutdownOnCriticalIO);
} else {
manager = new FileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout());
}

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.jboss.logging.Logger;
/**
* Default implementation of a {@link ScheduledLeaseLock}: see {@link ScheduledLeaseLock#of(ScheduledExecutorService, ArtemisExecutor, String, LeaseLock, long, IOCriticalErrorListener)}.
*/
final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implements ScheduledLeaseLock {
private static final Logger LOGGER = Logger.getLogger(ActiveMQScheduledLeaseLock.class);
private final String lockName;
private final LeaseLock lock;
private long lastLockRenewStart;
private final long renewPeriodMillis;
private final IOCriticalErrorListener ioCriticalErrorListener;
ActiveMQScheduledLeaseLock(ScheduledExecutorService scheduledExecutorService,
ArtemisExecutor executor,
String lockName,
LeaseLock lock,
long renewPeriodMillis,
IOCriticalErrorListener ioCriticalErrorListener) {
super(scheduledExecutorService, executor, 0, renewPeriodMillis, TimeUnit.MILLISECONDS, false);
if (renewPeriodMillis >= lock.expirationMillis()) {
throw new IllegalArgumentException("renewPeriodMillis must be < lock's expirationMillis");
}
this.lockName = lockName;
this.lock = lock;
this.renewPeriodMillis = renewPeriodMillis;
//already expired start time
this.lastLockRenewStart = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(lock.expirationMillis());
this.ioCriticalErrorListener = ioCriticalErrorListener;
}
@Override
public long renewPeriodMillis() {
return renewPeriodMillis;
}
@Override
public LeaseLock lock() {
return lock;
}
@Override
public synchronized void start() {
if (isStarted()) {
return;
}
this.lastLockRenewStart = System.nanoTime();
super.start();
}
@Override
public synchronized void stop() {
if (!isStarted()) {
return;
}
super.stop();
}
@Override
public void run() {
final long lastRenewStart = this.lastLockRenewStart;
final long renewStart = System.nanoTime();
if (!this.lock.renew()) {
ioCriticalErrorListener.onIOException(new IllegalStateException(lockName + " lock can't be renewed"), "Critical error while on " + lockName + " renew", null);
}
//logic to detect slowness of DB and/or the scheduled executor service
detectAndReportRenewSlowness(lockName, lastRenewStart, renewStart, renewPeriodMillis, lock.expirationMillis());
this.lastLockRenewStart = lastRenewStart;
}
private static void detectAndReportRenewSlowness(String lockName,
long lastRenewStart,
long renewStart,
long expectedRenewPeriodMillis,
long expirationMillis) {
final long elapsedMillisToRenew = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - renewStart);
if (elapsedMillisToRenew > expectedRenewPeriodMillis) {
LOGGER.error(lockName + " lock renew tooks " + elapsedMillisToRenew + " ms, while is supposed to take <" + expectedRenewPeriodMillis + " ms");
}
final long measuredRenewPeriodNanos = renewStart - lastRenewStart;
final long measuredRenewPeriodMillis = TimeUnit.NANOSECONDS.toMillis(measuredRenewPeriodNanos);
if (measuredRenewPeriodMillis > expirationMillis) {
LOGGER.error(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms");
} else if (measuredRenewPeriodMillis > expectedRenewPeriodMillis) {
LOGGER.warn(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms");
}
}
}

View File

@ -0,0 +1,277 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Objects;
import java.util.function.Predicate;
import org.jboss.logging.Logger;
/**
* JDBC implementation of a {@link LeaseLock} with a {@code String} defined {@link #holderId()}.
*/
final class JdbcLeaseLock implements LeaseLock {
private static final Logger LOGGER = Logger.getLogger(JdbcLeaseLock.class);
private static final int MAX_HOLDER_ID_LENGTH = 128;
private final Connection connection;
private final long maxAllowableMillisDiffFromDBTime;
private long millisDiffFromCurrentTime;
private final String holderId;
private final PreparedStatement tryAcquireLock;
private final PreparedStatement tryReleaseLock;
private final PreparedStatement renewLock;
private final PreparedStatement isLocked;
private final PreparedStatement currentDateTime;
private final long expirationMillis;
private boolean maybeAcquired;
/**
* The lock will be responsible (ie {@link #close()}) of all the {@link PreparedStatement}s used by it, but not of the {@link Connection},
* whose life cycle will be managed externally.
*/
JdbcLeaseLock(String holderId,
Connection connection,
PreparedStatement tryAcquireLock,
PreparedStatement tryReleaseLock,
PreparedStatement renewLock,
PreparedStatement isLocked,
PreparedStatement currentDateTime,
long expirationMIllis,
long maxAllowableMillisDiffFromDBTime) {
if (holderId.length() > MAX_HOLDER_ID_LENGTH) {
throw new IllegalArgumentException("holderId length must be <=" + MAX_HOLDER_ID_LENGTH);
}
this.holderId = holderId;
this.maxAllowableMillisDiffFromDBTime = maxAllowableMillisDiffFromDBTime;
this.millisDiffFromCurrentTime = Long.MAX_VALUE;
this.tryAcquireLock = tryAcquireLock;
this.tryReleaseLock = tryReleaseLock;
this.renewLock = renewLock;
this.isLocked = isLocked;
this.currentDateTime = currentDateTime;
this.expirationMillis = expirationMIllis;
this.maybeAcquired = false;
this.connection = connection;
}
public String holderId() {
return holderId;
}
@Override
public long expirationMillis() {
return expirationMillis;
}
private long timeDifference() throws SQLException {
if (Long.MAX_VALUE == millisDiffFromCurrentTime) {
if (maxAllowableMillisDiffFromDBTime > 0) {
millisDiffFromCurrentTime = determineTimeDifference();
} else {
millisDiffFromCurrentTime = 0L;
}
}
return millisDiffFromCurrentTime;
}
private long determineTimeDifference() throws SQLException {
try (ResultSet resultSet = currentDateTime.executeQuery()) {
long result = 0L;
if (resultSet.next()) {
final Timestamp timestamp = resultSet.getTimestamp(1);
final long diff = System.currentTimeMillis() - timestamp.getTime();
if (Math.abs(diff) > maxAllowableMillisDiffFromDBTime) {
// off by more than maxAllowableMillisDiffFromDBTime so lets adjust
result = (-diff);
}
LOGGER.info(holderId() + " diff adjust from db: " + result + ", db time: " + timestamp);
}
return result;
}
}
@Override
public boolean renew() {
synchronized (connection) {
try {
final boolean result;
connection.setAutoCommit(false);
try {
final long timeDifference = timeDifference();
final PreparedStatement preparedStatement = this.renewLock;
final long now = System.currentTimeMillis() + timeDifference;
final Timestamp timestamp = new Timestamp(now + expirationMillis);
preparedStatement.setTimestamp(1, timestamp);
preparedStatement.setString(2, holderId);
result = preparedStatement.executeUpdate() == 1;
} catch (SQLException ie) {
connection.rollback();
connection.setAutoCommit(true);
throw new IllegalStateException(ie);
}
connection.commit();
connection.setAutoCommit(true);
return result;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
}
@Override
public boolean tryAcquire() {
synchronized (connection) {
try {
final boolean acquired;
connection.setAutoCommit(false);
try {
final long timeDifference = timeDifference();
final PreparedStatement preparedStatement = tryAcquireLock;
final long now = System.currentTimeMillis() + timeDifference;
preparedStatement.setString(1, holderId);
final Timestamp timestamp = new Timestamp(now + expirationMillis);
preparedStatement.setTimestamp(2, timestamp);
acquired = preparedStatement.executeUpdate() == 1;
} catch (SQLException ie) {
connection.rollback();
connection.setAutoCommit(true);
throw new IllegalStateException(ie);
}
connection.commit();
connection.setAutoCommit(true);
if (acquired) {
this.maybeAcquired = true;
}
return acquired;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
}
@Override
public boolean isHeld() {
return checkValidHolderId(Objects::nonNull);
}
@Override
public boolean isHeldByCaller() {
return checkValidHolderId(this.holderId::equals);
}
private boolean checkValidHolderId(Predicate<? super String> holderIdFilter) {
synchronized (connection) {
try {
boolean result;
connection.setAutoCommit(false);
try {
final long timeDifference = timeDifference();
final PreparedStatement preparedStatement = this.isLocked;
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
result = false;
} else {
final String currentHolderId = resultSet.getString(1);
result = holderIdFilter.test(currentHolderId);
//warn about any zombie lock
final Timestamp timestamp = resultSet.getTimestamp(2);
if (timestamp != null) {
final long lockExpirationTime = timestamp.getTime();
final long now = System.currentTimeMillis() + timeDifference;
final long expiredBy = now - lockExpirationTime;
if (expiredBy > 0) {
result = false;
LOGGER.warn("found zombie lock with holderId: " + currentHolderId + " expired by: " + expiredBy + " ms");
}
}
}
}
} catch (SQLException ie) {
connection.rollback();
connection.setAutoCommit(true);
throw new IllegalStateException(ie);
}
connection.commit();
connection.setAutoCommit(true);
return result;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
}
@Override
public void release() {
synchronized (connection) {
try {
connection.setAutoCommit(false);
try {
final PreparedStatement preparedStatement = this.tryReleaseLock;
preparedStatement.setString(1, holderId);
if (preparedStatement.executeUpdate() != 1) {
LOGGER.warn(holderId + " has failed to release a lock");
} else {
LOGGER.info(holderId + " has released a lock");
}
//consider it as released to avoid on finalize to be reclaimed
this.maybeAcquired = false;
} catch (SQLException ie) {
connection.rollback();
connection.setAutoCommit(true);
throw new IllegalStateException(ie);
}
connection.commit();
connection.setAutoCommit(true);
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
}
@Override
public void close() throws SQLException {
synchronized (connection) {
//to avoid being called if not needed
if (!this.tryReleaseLock.isClosed()) {
try {
if (this.maybeAcquired) {
release();
}
} finally {
this.tryReleaseLock.close();
this.tryAcquireLock.close();
this.renewLock.close();
this.isLocked.close();
this.currentDateTime.close();
}
}
}
}
@Override
protected void finalize() throws Throwable {
close();
}
}

View File

@ -0,0 +1,380 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl.jdbc;
import javax.sql.DataSource;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.jboss.logging.Logger;
/**
* JDBC implementation of {@link NodeManager}.
*/
public final class JdbcNodeManager extends NodeManager {
private static final Logger logger = Logger.getLogger(JdbcNodeManager.class);
private static final long MAX_PAUSE_MILLIS = 2000L;
private final SharedStateManager sharedStateManager;
private final ScheduledLeaseLock scheduledLiveLock;
private final ScheduledLeaseLock scheduledBackupLock;
private final long lockRenewPeriodMillis;
private final long lockAcquisitionTimeoutMillis;
private volatile boolean interrupted = false;
private final LeaseLock.Pauser pauser;
private final IOCriticalErrorListener ioCriticalErrorListener;
public static JdbcNodeManager with(DatabaseStorageConfiguration configuration,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
if (configuration.getDataSource() != null) {
final String brokerId = java.util.UUID.randomUUID().toString();
return usingDataSource(brokerId, configuration.getJdbcLockExpirationMillis(), configuration.getJdbcLockRenewPeriodMillis(), configuration.getJdbcLockAcquisitionTimeoutMillis(), configuration.getDataSource(), configuration.getSqlProviderFactory().create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER), scheduledExecutorService, executorFactory, ioCriticalErrorListener);
} else {
final String brokerId = java.util.UUID.randomUUID().toString();
return usingConnectionUrl(brokerId, configuration.getJdbcLockExpirationMillis(), configuration.getJdbcLockRenewPeriodMillis(), configuration.getJdbcLockAcquisitionTimeoutMillis(), configuration.getJdbcConnectionUrl(), configuration.getJdbcDriverClassName(), configuration.getSqlProviderFactory().create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER), scheduledExecutorService, executorFactory, ioCriticalErrorListener);
}
}
static JdbcNodeManager usingDataSource(String brokerId,
long lockExpirationMillis,
long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis,
DataSource dataSource,
SQLProvider provider,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
return new JdbcNodeManager(JdbcSharedStateManager.usingDataSource(brokerId, lockExpirationMillis, dataSource, provider), false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, executorFactory, ioCriticalErrorListener);
}
public static JdbcNodeManager usingConnectionUrl(String brokerId,
long lockExpirationMillis,
long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis,
String jdbcUrl,
String driverClass,
SQLProvider provider,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
return new JdbcNodeManager(JdbcSharedStateManager.usingConnectionUrl(brokerId, lockExpirationMillis, jdbcUrl, driverClass, provider), false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, executorFactory, ioCriticalErrorListener);
}
private JdbcNodeManager(final SharedStateManager sharedStateManager,
boolean replicatedBackup,
long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
super(replicatedBackup, null);
this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis;
this.lockRenewPeriodMillis = lockRenewPeriodMillis;
this.pauser = LeaseLock.Pauser.sleep(Math.min(this.lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS);
this.sharedStateManager = sharedStateManager;
this.scheduledLiveLock = ScheduledLeaseLock.of(scheduledExecutorService, executorFactory.getExecutor(), "live", this.sharedStateManager.liveLock(), lockRenewPeriodMillis, ioCriticalErrorListener);
this.scheduledBackupLock = ScheduledLeaseLock.of(scheduledExecutorService, executorFactory.getExecutor(), "backup", this.sharedStateManager.backupLock(), lockRenewPeriodMillis, ioCriticalErrorListener);
this.ioCriticalErrorListener = ioCriticalErrorListener;
}
@Override
public synchronized void start() throws Exception {
if (isStarted()) {
return;
}
if (!replicatedBackup) {
final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
setUUID(nodeId);
}
super.start();
}
@Override
public synchronized void stop() throws Exception {
if (isStarted()) {
try {
this.scheduledLiveLock.stop();
this.scheduledBackupLock.stop();
} finally {
super.stop();
this.sharedStateManager.close();
}
}
}
@Override
protected void finalize() throws Throwable {
stop();
}
@Override
public boolean isAwaitingFailback() throws Exception {
return readSharedState() == SharedStateManager.State.FAILING_BACK;
}
@Override
public boolean isBackupLive() throws Exception {
//is anyone holding the live lock?
return this.scheduledLiveLock.lock().isHeld();
}
@Override
public void stopBackup() throws Exception {
if (replicatedBackup) {
final UUID nodeId = getUUID();
sharedStateManager.writeNodeId(nodeId);
}
releaseBackup();
}
@Override
public void interrupt() {
//need to be volatile: must be called concurrently to work as expected
interrupted = true;
}
@Override
public void releaseBackup() throws Exception {
if (this.scheduledBackupLock.lock().isHeldByCaller()) {
this.scheduledBackupLock.stop();
this.scheduledBackupLock.lock().release();
}
}
/**
* Try to acquire a lock, failing with an exception otherwise.
*/
private void lock(LeaseLock lock) throws Exception {
final LeaseLock.AcquireResult acquireResult = lock.tryAcquire(this.lockAcquisitionTimeoutMillis, this.pauser, () -> !this.interrupted);
switch (acquireResult) {
case Timeout:
throw new Exception("timed out waiting for lock");
case Exit:
this.interrupted = false;
throw new InterruptedException("LeaseLock was interrupted");
case Done:
break;
default:
throw new AssertionError(acquireResult + " not managed");
}
}
private void checkInterrupted(Supplier<String> message) throws InterruptedException {
if (this.interrupted) {
interrupted = false;
throw new InterruptedException(message.get());
}
}
private void renewLiveLockIfNeeded(final long acquiredOn) {
final long acquiredMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - acquiredOn);
if (acquiredMillis > this.scheduledLiveLock.renewPeriodMillis()) {
if (!this.scheduledLiveLock.lock().renew()) {
final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
try {
ioCriticalErrorListener.onIOException(e, "live lock can't be renewed", null);
} finally {
throw e;
}
}
}
}
/**
* Lock live node and check for a live state, taking care to renew it (if needed) or releasing it otherwise
*/
private boolean lockLiveAndCheckLiveState() throws Exception {
lock(this.scheduledLiveLock.lock());
final long acquiredOn = System.nanoTime();
boolean liveWhileLocked = false;
//check if the state is live
final SharedStateManager.State stateWhileLocked;
try {
stateWhileLocked = readSharedState();
} catch (Throwable t) {
logger.error("error while holding the live node lock and tried to read the shared state", t);
this.scheduledLiveLock.lock().release();
throw t;
}
if (stateWhileLocked == SharedStateManager.State.LIVE) {
renewLiveLockIfNeeded(acquiredOn);
liveWhileLocked = true;
} else {
if (logger.isDebugEnabled()) {
logger.debug("state is " + stateWhileLocked + " while holding the live lock");
}
//state is not live: can (try to) release the lock
this.scheduledLiveLock.lock().release();
}
return liveWhileLocked;
}
@Override
public void awaitLiveNode() throws Exception {
boolean liveWhileLocked = false;
while (!liveWhileLocked) {
//check first without holding any lock
final SharedStateManager.State state = readSharedState();
if (state == SharedStateManager.State.LIVE) {
//verify if the state is live while holding the live node lock too
liveWhileLocked = lockLiveAndCheckLiveState();
} else {
if (logger.isDebugEnabled()) {
logger.debug("awaiting live node...state: " + state);
}
}
if (!liveWhileLocked) {
checkInterrupted(() -> "awaitLiveNode got interrupted!");
pauser.idle();
}
}
//state is LIVE and live lock is acquired and valid
logger.debug("acquired live node lock");
this.scheduledLiveLock.start();
}
@Override
public void startBackup() throws Exception {
assert !replicatedBackup; // should not be called if this is a replicating backup
ActiveMQServerLogger.LOGGER.waitingToBecomeBackup();
lock(scheduledBackupLock.lock());
scheduledBackupLock.start();
ActiveMQServerLogger.LOGGER.gotBackupLock();
if (getUUID() == null)
readNodeId();
}
@Override
public ActivateCallback startLiveNode() throws Exception {
setFailingBack();
final String timeoutMessage = lockAcquisitionTimeoutMillis == -1 ? "indefinitely" : lockAcquisitionTimeoutMillis + " milliseconds";
ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage);
lock(this.scheduledLiveLock.lock());
this.scheduledLiveLock.start();
ActiveMQServerLogger.LOGGER.obtainedLiveLock();
return new ActivateCallback() {
@Override
public void preActivate() {
}
@Override
public void activated() {
}
@Override
public void deActivate() {
}
@Override
public void activationComplete() {
try {
//state can be written only if the live renew task is running
setLive();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
}
}
};
}
@Override
public void pauseLiveServer() throws Exception {
if (scheduledLiveLock.isStarted()) {
setPaused();
scheduledLiveLock.stop();
scheduledLiveLock.lock().release();
} else if (scheduledLiveLock.lock().renew()) {
setPaused();
scheduledLiveLock.lock().release();
} else {
final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
try {
ioCriticalErrorListener.onIOException(e, "live lock can't be renewed on pauseLiveServer", null);
} finally {
throw e;
}
}
}
@Override
public void crashLiveServer() throws Exception {
if (this.scheduledLiveLock.lock().isHeldByCaller()) {
scheduledLiveLock.stop();
this.scheduledLiveLock.lock().release();
}
}
@Override
public void awaitLiveStatus() {
while (readSharedState() != SharedStateManager.State.LIVE) {
pauser.idle();
}
}
private void setLive() {
writeSharedState(SharedStateManager.State.LIVE);
}
private void setFailingBack() {
writeSharedState(SharedStateManager.State.FAILING_BACK);
}
private void setPaused() {
writeSharedState(SharedStateManager.State.PAUSED);
}
private void writeSharedState(SharedStateManager.State state) {
assert !this.replicatedBackup : "the replicated backup can't write the shared state!";
this.sharedStateManager.writeState(state);
}
private SharedStateManager.State readSharedState() {
return this.sharedStateManager.readState();
}
@Override
public SimpleString readNodeId() {
final UUID nodeId = this.sharedStateManager.readNodeId();
setUUID(nodeId);
return getNodeId();
}
}

View File

@ -0,0 +1,302 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl.jdbc;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.function.Supplier;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.UUID;
/**
* JDBC implementation of a {@link SharedStateManager}.
*/
@SuppressWarnings("SynchronizeOnNonFinalField")
final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedStateManager {
private final String holderId;
private final long lockExpirationMillis;
private JdbcLeaseLock liveLock;
private JdbcLeaseLock backupLock;
private PreparedStatement readNodeId;
private PreparedStatement writeNodeId;
private PreparedStatement readState;
private PreparedStatement writeState;
public static JdbcSharedStateManager usingDataSource(String holderId,
long locksExpirationMillis,
DataSource dataSource,
SQLProvider provider) {
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
sharedStateManager.setDataSource(dataSource);
sharedStateManager.setSqlProvider(provider);
try {
sharedStateManager.start();
return sharedStateManager;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
public static JdbcSharedStateManager usingConnectionUrl(String holderId,
long locksExpirationMillis,
String jdbcConnectionUrl,
String jdbcDriverClass,
SQLProvider provider) {
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
sharedStateManager.setJdbcConnectionUrl(jdbcConnectionUrl);
sharedStateManager.setJdbcDriverClass(jdbcDriverClass);
sharedStateManager.setSqlProvider(provider);
try {
sharedStateManager.start();
return sharedStateManager;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
@Override
protected void createSchema() throws SQLException {
try {
createTable(sqlProvider.createNodeManagerStoreTableSQL(), sqlProvider.createNodeIdSQL(), sqlProvider.createStateSQL(), sqlProvider.createLiveLockSQL(), sqlProvider.createBackupLockSQL());
} catch (SQLException e) {
//no op: if a table already exists is not a problem in this case, the prepareStatements() call will fail right after it if the table is not correctly initialized
}
}
static JdbcLeaseLock createLiveLock(String holderId,
Connection connection,
SQLProvider sqlProvider,
long expirationMillis,
long maxAllowableMillisDiffFromDBtime) throws SQLException {
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, maxAllowableMillisDiffFromDBtime);
}
static JdbcLeaseLock createBackupLock(String holderId,
Connection connection,
SQLProvider sqlProvider,
long expirationMillis,
long maxAllowableMillisDiffFromDBtime) throws SQLException {
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, maxAllowableMillisDiffFromDBtime);
}
@Override
protected void prepareStatements() throws SQLException {
this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, 0);
this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, 0);
this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL());
this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL());
this.writeState = connection.prepareStatement(sqlProvider.writeStateSQL());
this.readState = connection.prepareStatement(sqlProvider.readStateSQL());
}
private JdbcSharedStateManager(String holderId, long lockExpirationMillis) {
this.holderId = holderId;
this.lockExpirationMillis = lockExpirationMillis;
}
@Override
public LeaseLock liveLock() {
return this.liveLock;
}
@Override
public LeaseLock backupLock() {
return this.backupLock;
}
private UUID rawReadNodeId() throws SQLException {
final PreparedStatement preparedStatement = this.readNodeId;
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
return null;
} else {
final String nodeId = resultSet.getString(1);
if (nodeId != null) {
return new UUID(UUID.TYPE_TIME_BASED, UUID.stringToBytes(nodeId));
} else {
return null;
}
}
}
}
@Override
public UUID readNodeId() {
synchronized (connection) {
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
connection.setAutoCommit(true);
final UUID nodeId = rawReadNodeId();
return nodeId;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
}
@Override
public void writeNodeId(UUID nodeId) {
synchronized (connection) {
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
connection.setAutoCommit(true);
rawWriteNodeId(nodeId);
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
}
private void rawWriteNodeId(UUID nodeId) throws SQLException {
final PreparedStatement preparedStatement = this.writeNodeId;
preparedStatement.setString(1, nodeId.toString());
if (preparedStatement.executeUpdate() != 1) {
throw new IllegalStateException("can't write NODE_ID on the JDBC Node Manager Store!");
}
}
@Override
public UUID setup(Supplier<? extends UUID> nodeIdFactory) {
//uses a single transaction to make everything
synchronized (connection) {
try {
final UUID nodeId;
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
connection.setAutoCommit(false);
try {
UUID readNodeId = rawReadNodeId();
if (readNodeId == null) {
nodeId = nodeIdFactory.get();
rawWriteNodeId(nodeId);
} else {
nodeId = readNodeId;
}
} catch (SQLException e) {
connection.rollback();
connection.setAutoCommit(true);
throw e;
}
connection.commit();
connection.setAutoCommit(true);
return nodeId;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
}
private static State decodeState(String s) {
if (s == null) {
return State.NOT_STARTED;
}
switch (s) {
case "L":
return State.LIVE;
case "F":
return State.FAILING_BACK;
case "P":
return State.PAUSED;
case "N":
return State.NOT_STARTED;
default:
throw new IllegalStateException("unknown state [" + s + "]");
}
}
private static String encodeState(State state) {
switch (state) {
case LIVE:
return "L";
case FAILING_BACK:
return "F";
case PAUSED:
return "P";
case NOT_STARTED:
return "N";
default:
throw new IllegalStateException("unknown state [" + state + "]");
}
}
@Override
public State readState() {
synchronized (connection) {
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
connection.setAutoCommit(true);
final State state;
final PreparedStatement preparedStatement = this.readState;
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
state = State.FIRST_TIME_START;
} else {
state = decodeState(resultSet.getString(1));
}
}
return state;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
}
@Override
public void writeState(State state) {
final String encodedState = encodeState(state);
synchronized (connection) {
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
connection.setAutoCommit(true);
final PreparedStatement preparedStatement = this.writeState;
preparedStatement.setString(1, encodedState);
if (preparedStatement.executeUpdate() != 1) {
throw new IllegalStateException("can't write STATE to the JDBC Node Manager Store!");
}
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
}
@Override
public void stop() throws SQLException {
//release all the managed resources inside the connection lock
if (sqlProvider.closeConnectionOnShutdown()) {
synchronized (connection) {
this.readNodeId.close();
this.writeNodeId.close();
this.readState.close();
this.writeState.close();
this.liveLock.close();
this.backupLock.close();
super.stop();
}
}
}
@Override
public void close() throws SQLException {
stop();
}
}

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
/**
* It represents a lock that can't be held more than {@link #expirationMillis()} without being renewed.
*
* <p>
* An implementor must provide implicitly the caller identity to contextualize each operation (eg {@link JdbcLeaseLock}
* uses one caller per instance)
*/
interface LeaseLock extends AutoCloseable {
enum AcquireResult {
Timeout, Exit, Done
}
interface ExitCondition {
/**
* @return true as long as we should keep running
*/
boolean keepRunning();
}
interface Pauser {
void idle();
static Pauser sleep(long idleTime, TimeUnit timeUnit) {
final long idleNanos = timeUnit.toNanos(idleTime);
//can fail spuriously but doesn't throw any InterruptedException
return () -> LockSupport.parkNanos(idleNanos);
}
static Pauser noWait() {
return () -> {
};
}
}
/**
* The expiration in milliseconds from the last valid acquisition/renew.
*/
default long expirationMillis() {
return Long.MAX_VALUE;
}
/**
* It extends the lock expiration (if held) to {@link System#currentTimeMillis()} + {@link #expirationMillis()}.
*
* @return {@code true} if the expiration has been moved on, {@code false} otherwise
*/
default boolean renew() {
return true;
}
/**
* Not reentrant lock acquisition operation.
* The lock can be acquired if is not held by anyone (including the caller) or has an expired ownership.
*
* @return {@code true} if has been acquired, {@code false} otherwise
*/
boolean tryAcquire();
/**
* Not reentrant lock acquisition operation (ie {@link #tryAcquire()}).
* It tries to acquire the lock until will succeed (ie {@link AcquireResult#Done})or got interrupted (ie {@link AcquireResult#Exit}).
* After each failed attempt is performed a {@link Pauser#idle} call.
*/
default AcquireResult tryAcquire(ExitCondition exitCondition, Pauser pauser) {
while (exitCondition.keepRunning()) {
if (tryAcquire()) {
return AcquireResult.Done;
} else {
pauser.idle();
}
}
return AcquireResult.Exit;
}
/**
* Not reentrant lock acquisition operation (ie {@link #tryAcquire()}).
* It tries to acquire the lock until will succeed (ie {@link AcquireResult#Done}), got interrupted (ie {@link AcquireResult#Exit})
* or exceed {@code tryAcquireTimeoutMillis}.
* After each failed attempt is performed a {@link Pauser#idle} call.
* If the specified timeout is <=0 then it behaves as {@link #tryAcquire(ExitCondition, Pauser)}.
*/
default AcquireResult tryAcquire(long tryAcquireTimeoutMillis, Pauser pauser, ExitCondition exitCondition) {
if (tryAcquireTimeoutMillis < 0) {
return tryAcquire(exitCondition, pauser);
}
final long timeoutInNanosecond = TimeUnit.MILLISECONDS.toNanos(tryAcquireTimeoutMillis);
final long startAcquire = System.nanoTime();
while (exitCondition.keepRunning()) {
if (tryAcquire()) {
return AcquireResult.Done;
} else if (System.nanoTime() - startAcquire >= timeoutInNanosecond) {
return AcquireResult.Timeout;
} else {
pauser.idle();
//check before doing anything if time is expired
if (System.nanoTime() - startAcquire >= timeoutInNanosecond) {
return AcquireResult.Timeout;
}
}
}
return AcquireResult.Exit;
}
/**
* @return {@code true} if there is a valid (ie not expired) owner, {@code false} otherwise
*/
boolean isHeld();
/**
* @return {@code true} if the caller is a valid (ie not expired) owner, {@code false} otherwise
*/
boolean isHeldByCaller();
/**
* It release the lock itself and all the resources used by it (eg connections, file handlers)
*/
@Override
default void close() throws Exception {
release();
}
/**
* Perform the release if this lock is held by the caller.
*/
void release();
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
/**
* {@link LeaseLock} holder that allows to schedule a {@link LeaseLock#renew} task with a fixed {@link #renewPeriodMillis()} delay.
*/
interface ScheduledLeaseLock extends ActiveMQComponent {
LeaseLock lock();
long renewPeriodMillis();
static ScheduledLeaseLock of(ScheduledExecutorService scheduledExecutorService,
ArtemisExecutor executor,
String lockName,
LeaseLock lock,
long renewPeriodMillis,
IOCriticalErrorListener ioCriticalErrorListener) {
return new ActiveMQScheduledLeaseLock(scheduledExecutorService, executor, lockName, lock, renewPeriodMillis, ioCriticalErrorListener);
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.util.function.Supplier;
import org.apache.activemq.artemis.utils.UUID;
/**
* Facade to abstract the operations on the shared state (inter-process and/or inter-thread) necessary to coordinate broker nodes.
*/
interface SharedStateManager extends AutoCloseable {
enum State {
LIVE, PAUSED, FAILING_BACK, NOT_STARTED, FIRST_TIME_START
}
LeaseLock liveLock();
LeaseLock backupLock();
UUID readNodeId();
void writeNodeId(UUID nodeId);
/**
* Purpose of this method is to setup the environment to provide a shared state between live/backup servers.
* That means:
* - check if a shared state exist and create it/wait for it if not
* - check if a nodeId exists and create it if not
*
* @param nodeIdFactory used to create the nodeId if needed
* @return the newly created NodeId or the old one if already present
*/
UUID setup(Supplier<? extends UUID> nodeIdFactory);
State readState();
void writeState(State state);
@Override
default void close() throws Exception {
}
}

View File

@ -1884,6 +1884,27 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="jdbc-lock-acquisition-timeout" type="xsd:int" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The max allowed time in milliseconds while trying to acquire a JDBC lock.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="jdbc-lock-renew-period" type="xsd:int" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The period in milliseconds of the keep alive service of a JDBC lock.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="jdbc-lock-expiration" type="xsd:int" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The time in milliseconds a JDBC lock is considered valid without keeping it alive.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all>
</xsd:complexType>

View File

@ -0,0 +1,231 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.sql.SQLException;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class JdbcLeaseLockTest {
private static final long DEFAULT_LOCK_EXPIRATION_MILLIS = TimeUnit.SECONDS.toMillis(10);
private static final SQLProvider SQL_PROVIDER = new DerbySQLProvider.Factory().create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true";
private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
private JdbcSharedStateManager jdbcSharedStateManager;
private LeaseLock lock() {
return lock(DEFAULT_LOCK_EXPIRATION_MILLIS);
}
private LeaseLock lock(long acquireMillis) {
try {
return JdbcSharedStateManager.createLiveLock(UUID.randomUUID().toString(), jdbcSharedStateManager.getConnection(), SQL_PROVIDER, acquireMillis, 0);
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
@Before
public void createLockTable() {
jdbcSharedStateManager = JdbcSharedStateManager.usingConnectionUrl(UUID.randomUUID().toString(), DEFAULT_LOCK_EXPIRATION_MILLIS, JDBC_URL, DRIVER_CLASS_NAME, SQL_PROVIDER);
}
@After
public void dropLockTable() throws Exception {
jdbcSharedStateManager.destroy();
jdbcSharedStateManager.close();
}
@Test
public void shouldAcquireLock() {
final LeaseLock lock = lock();
final boolean acquired = lock.tryAcquire();
Assert.assertTrue("Must acquire the lock!", acquired);
try {
Assert.assertTrue("The lock is been held by the caller!", lock.isHeldByCaller());
} finally {
lock.release();
}
}
@Test
public void shouldNotAcquireLockWhenAlreadyHeldByOthers() {
final LeaseLock lock = lock();
Assert.assertTrue("Must acquire the lock", lock.tryAcquire());
try {
Assert.assertTrue("Lock held by the caller", lock.isHeldByCaller());
final LeaseLock failingLock = lock();
Assert.assertFalse("lock already held by other", failingLock.tryAcquire());
Assert.assertFalse("lock already held by other", failingLock.isHeldByCaller());
Assert.assertTrue("lock already held by other", failingLock.isHeld());
} finally {
lock.release();
}
}
@Test
public void shouldNotAcquireLockTwice() {
final LeaseLock lock = lock();
Assert.assertTrue("Must acquire the lock", lock.tryAcquire());
try {
Assert.assertFalse("lock already acquired", lock.tryAcquire());
} finally {
lock.release();
}
}
@Test
public void shouldNotCorruptGuardedState() throws InterruptedException {
final AtomicLong sharedState = new AtomicLong(0);
final int producers = 2;
final int writesPerProducer = 10;
final long idleMillis = 1000;
final long millisToAcquireLock = writesPerProducer * (producers - 1) * idleMillis;
final LeaseLock.Pauser pauser = LeaseLock.Pauser.sleep(idleMillis, TimeUnit.MILLISECONDS);
final CountDownLatch finished = new CountDownLatch(producers);
final LeaseLock[] locks = new LeaseLock[producers];
final AtomicInteger lockIndex = new AtomicInteger(0);
final Runnable producerTask = () -> {
final LeaseLock lock = locks[lockIndex.getAndIncrement()];
try {
for (int i = 0; i < writesPerProducer; i++) {
final LeaseLock.AcquireResult acquireResult = lock.tryAcquire(millisToAcquireLock, pauser, () -> true);
if (acquireResult != LeaseLock.AcquireResult.Done) {
throw new IllegalStateException(acquireResult + " from " + Thread.currentThread());
}
//avoid the atomic getAndIncrement operation on purpose
sharedState.lazySet(sharedState.get() + 1);
lock.release();
}
} finally {
finished.countDown();
}
};
final Thread[] producerThreads = new Thread[producers];
for (int i = 0; i < producers; i++) {
locks[i] = lock();
producerThreads[i] = new Thread(producerTask);
}
Stream.of(producerThreads).forEach(Thread::start);
final long maxTestTime = millisToAcquireLock * writesPerProducer * producers;
Assert.assertTrue("Each producers must complete the writes", finished.await(maxTestTime, TimeUnit.MILLISECONDS));
Assert.assertEquals("locks hasn't mutual excluded producers", writesPerProducer * producers, sharedState.get());
}
@Test
public void shouldAcquireExpiredLock() throws InterruptedException {
final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try {
Thread.sleep(lock.expirationMillis() * 2);
Assert.assertFalse("lock is already expired", lock.isHeldByCaller());
Assert.assertFalse("lock is already expired", lock.isHeld());
Assert.assertTrue("lock is already expired", lock.tryAcquire());
} finally {
lock.release();
}
}
@Test
public void shouldOtherAcquireExpiredLock() throws InterruptedException {
final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try {
Thread.sleep(lock.expirationMillis() * 2);
Assert.assertFalse("lock is already expired", lock.isHeldByCaller());
Assert.assertFalse("lock is already expired", lock.isHeld());
final LeaseLock otherLock = lock(TimeUnit.SECONDS.toMillis(10));
try {
Assert.assertTrue("lock is already expired", otherLock.tryAcquire());
} finally {
otherLock.release();
}
} finally {
lock.release();
}
}
@Test
public void shouldRenewAcquiredLock() throws InterruptedException {
final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(10));
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try {
Assert.assertTrue("lock is owned", lock.renew());
} finally {
lock.release();
}
}
@Test
public void shouldNotRenewReleasedLock() throws InterruptedException {
final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(10));
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
lock.release();
Assert.assertFalse("lock is already released", lock.isHeldByCaller());
Assert.assertFalse("lock is already released", lock.isHeld());
Assert.assertFalse("lock is already released", lock.renew());
}
@Test
public void shouldRenewExpiredLockNotAcquiredByOthers() throws InterruptedException {
final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try {
Thread.sleep(lock.expirationMillis() * 2);
Assert.assertFalse("lock is already expired", lock.isHeldByCaller());
Assert.assertFalse("lock is already expired", lock.isHeld());
Assert.assertTrue("lock is owned", lock.renew());
} finally {
lock.release();
}
}
@Test
public void shouldNotRenewLockAcquiredByOthers() throws InterruptedException {
final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try {
Thread.sleep(lock.expirationMillis() * 2);
Assert.assertFalse("lock is already expired", lock.isHeldByCaller());
Assert.assertFalse("lock is already expired", lock.isHeld());
final LeaseLock otherLock = lock(TimeUnit.SECONDS.toMillis(10));
Assert.assertTrue("lock is already expired", otherLock.tryAcquire());
try {
Assert.assertFalse("lock is owned by others", lock.renew());
} finally {
otherLock.release();
}
} finally {
lock.release();
}
}
}

View File

@ -452,6 +452,21 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a
The JDBC network connection timeout in milliseconds. The default value
is 20000 milliseconds (ie 20 seconds).
- `jdbc-lock-acquisition-timeout`
The max allowed time in milliseconds while trying to acquire a JDBC lock. The default value
is 60000 milliseconds (ie 60 seconds).
- `jdbc-lock-renew-period`
The period in milliseconds of the keep alive service of a JDBC lock. The default value
is 2000 milliseconds (ie 2 seconds).
- `jdbc-lock-expiration`
The time in milliseconds a JDBC lock is considered valid without keeping it alive. The default value
is 20000 milliseconds (ie 20 seconds).
Note that some DBMS (e.g. Oracle, 30 chars) have restrictions on the size of table names, this should be taken into consideration when configuring table names for the Artemis database store, pay particular attention to the page store table name, which can be appended with a unique ID of up to 20 characters. (for Oracle this would mean configuring a page-store-table-name of max size of 10 chars).