ARTEMIS-1640 JDBC NodeManager tests have to be customizable to run on different DBMS

ActiveMQTestBase has been enhanced to expose the Database storage configuration and by adding specific JDBC HA configuration properties.
JdbcLeaseLockTest and NettyFailoverTests have been changed in order to make use of the JDBC configuration provided by ActiveMQTestBase.
JdbcNodeManager has been made restartable to allow failover tests to reuse it after a failover.
This commit is contained in:
Francesco Nigro 2018-01-26 13:24:52 +01:00
parent d02a1423ba
commit 52e594d218
5 changed files with 137 additions and 51 deletions

View File

@ -44,9 +44,12 @@ public final class JdbcNodeManager extends NodeManager {
private static final Logger logger = Logger.getLogger(JdbcNodeManager.class);
private static final long MAX_PAUSE_MILLIS = 2000L;
private final SharedStateManager sharedStateManager;
private final ScheduledLeaseLock scheduledLiveLock;
private final ScheduledLeaseLock scheduledBackupLock;
private final Supplier<? extends SharedStateManager> sharedStateManagerFactory;
private final Supplier<? extends ScheduledLeaseLock> scheduledLiveLockFactory;
private final Supplier<? extends ScheduledLeaseLock> scheduledBackupLockFactory;
private SharedStateManager sharedStateManager;
private ScheduledLeaseLock scheduledLiveLock;
private ScheduledLeaseLock scheduledBackupLock;
private final long lockRenewPeriodMillis;
private final long lockAcquisitionTimeoutMillis;
private volatile boolean interrupted = false;
@ -82,7 +85,14 @@ public final class JdbcNodeManager extends NodeManager {
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
return new JdbcNodeManager(JdbcSharedStateManager.usingDataSource(brokerId, lockExpirationMillis, dataSource, provider), false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, executorFactory, ioCriticalErrorListener);
return new JdbcNodeManager(
() -> JdbcSharedStateManager.usingDataSource(brokerId, lockExpirationMillis, dataSource, provider),
false,
lockRenewPeriodMillis,
lockAcquisitionTimeoutMillis,
scheduledExecutorService,
executorFactory,
ioCriticalErrorListener);
}
public static JdbcNodeManager usingConnectionUrl(String brokerId,
@ -95,10 +105,17 @@ public final class JdbcNodeManager extends NodeManager {
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
return new JdbcNodeManager(JdbcSharedStateManager.usingConnectionUrl(brokerId, lockExpirationMillis, jdbcUrl, driverClass, provider), false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, executorFactory, ioCriticalErrorListener);
return new JdbcNodeManager(
() -> JdbcSharedStateManager.usingConnectionUrl(brokerId, lockExpirationMillis, jdbcUrl, driverClass, provider),
false,
lockRenewPeriodMillis,
lockAcquisitionTimeoutMillis,
scheduledExecutorService,
executorFactory,
ioCriticalErrorListener);
}
private JdbcNodeManager(final SharedStateManager sharedStateManager,
private JdbcNodeManager(Supplier<? extends SharedStateManager> sharedStateManagerFactory,
boolean replicatedBackup,
long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis,
@ -109,10 +126,26 @@ public final class JdbcNodeManager extends NodeManager {
this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis;
this.lockRenewPeriodMillis = lockRenewPeriodMillis;
this.pauser = LeaseLock.Pauser.sleep(Math.min(this.lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS);
this.sharedStateManager = sharedStateManager;
this.scheduledLiveLock = ScheduledLeaseLock.of(scheduledExecutorService, executorFactory != null ? executorFactory.getExecutor() : null, "live", this.sharedStateManager.liveLock(), lockRenewPeriodMillis, ioCriticalErrorListener);
this.scheduledBackupLock = ScheduledLeaseLock.of(scheduledExecutorService, executorFactory != null ? executorFactory.getExecutor() : null, "backup", this.sharedStateManager.backupLock(), lockRenewPeriodMillis, ioCriticalErrorListener);
this.sharedStateManagerFactory = sharedStateManagerFactory;
this.scheduledLiveLockFactory = () -> ScheduledLeaseLock.of(
scheduledExecutorService,
executorFactory != null ? executorFactory.getExecutor() : null,
"live",
this.sharedStateManager.liveLock(),
lockRenewPeriodMillis,
ioCriticalErrorListener);
this.scheduledBackupLockFactory = () -> ScheduledLeaseLock.of(
scheduledExecutorService,
executorFactory != null ?
executorFactory.getExecutor() : null,
"backup",
this.sharedStateManager.backupLock(),
lockRenewPeriodMillis,
ioCriticalErrorListener);
this.ioCriticalErrorListener = ioCriticalErrorListener;
this.sharedStateManager = null;
this.scheduledLiveLock = null;
this.scheduledBackupLock = null;
}
@Override
@ -122,13 +155,19 @@ public final class JdbcNodeManager extends NodeManager {
if (isStarted()) {
return;
}
this.sharedStateManager = sharedStateManagerFactory.get();
if (!replicatedBackup) {
final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
setUUID(nodeId);
}
this.scheduledLiveLock = scheduledLiveLockFactory.get();
this.scheduledBackupLock = scheduledBackupLockFactory.get();
super.start();
}
} catch (IllegalStateException e) {
this.sharedStateManager = null;
this.scheduledLiveLock = null;
this.scheduledBackupLock = null;
if (this.ioCriticalErrorListener != null) {
this.ioCriticalErrorListener.onIOException(e, "Failed to setup the JdbcNodeManager", null);
}
@ -145,6 +184,9 @@ public final class JdbcNodeManager extends NodeManager {
} finally {
super.stop();
this.sharedStateManager.close();
this.sharedStateManager = null;
this.scheduledLiveLock = null;
this.scheduledBackupLock = null;
}
}
}

View File

@ -25,31 +25,34 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.DERBY;
public class JdbcLeaseLockTest extends ActiveMQTestBase {
public class JdbcLeaseLockTest {
private static final long DEFAULT_LOCK_EXPIRATION_MILLIS = TimeUnit.SECONDS.toMillis(10);
private static final SQLProvider SQL_PROVIDER = new PropertySQLProvider.Factory(DERBY).create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true";
private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
private JdbcSharedStateManager jdbcSharedStateManager;
private DatabaseStorageConfiguration dbConf;
private SQLProvider sqlProvider;
private LeaseLock lock() {
return lock(DEFAULT_LOCK_EXPIRATION_MILLIS);
return lock(dbConf.getJdbcLockExpirationMillis());
}
private LeaseLock lock(long acquireMillis) {
try {
return JdbcSharedStateManager.createLiveLock(UUID.randomUUID().toString(), jdbcSharedStateManager.getConnection(), SQL_PROVIDER, acquireMillis, 0);
return JdbcSharedStateManager
.createLiveLock(
UUID.randomUUID().toString(),
jdbcSharedStateManager.getConnection(),
sqlProvider,
acquireMillis,
0);
} catch (SQLException e) {
throw new IllegalStateException(e);
}
@ -57,7 +60,18 @@ public class JdbcLeaseLockTest {
@Before
public void createLockTable() {
jdbcSharedStateManager = JdbcSharedStateManager.usingConnectionUrl(UUID.randomUUID().toString(), DEFAULT_LOCK_EXPIRATION_MILLIS, JDBC_URL, DRIVER_CLASS_NAME, SQL_PROVIDER);
dbConf = createDefaultDatabaseStorageConfiguration();
sqlProvider = JDBCUtils.getSQLProvider(
dbConf.getJdbcDriverClassName(),
dbConf.getNodeManagerStoreTableName(),
SQLProvider.DatabaseStoreType.NODE_MANAGER);
jdbcSharedStateManager = JdbcSharedStateManager
.usingConnectionUrl(
UUID.randomUUID().toString(),
dbConf.getJdbcLockExpirationMillis(),
dbConf.getJdbcConnectionUrl(),
dbConf.getJdbcDriverClassName(),
sqlProvider);
}
@After

View File

@ -60,6 +60,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@ -137,11 +138,11 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.CleanupSystemPropertiesRule;
import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.ThreadDumpUtil;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.ThreadDumpUtil;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
@ -466,6 +467,10 @@ public abstract class ActiveMQTestBase extends Assert {
}
protected void setDBStoreType(Configuration configuration) {
configuration.setStoreConfiguration(createDefaultDatabaseStorageConfiguration());
}
protected DatabaseStorageConfiguration createDefaultDatabaseStorageConfiguration() {
DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration();
dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl());
dbStorageConfiguration.setBindingsTableName("BINDINGS");
@ -473,8 +478,22 @@ public abstract class ActiveMQTestBase extends Assert {
dbStorageConfiguration.setLargeMessageTableName("LARGE_MESSAGE");
dbStorageConfiguration.setPageStoreTableName("PAGE_STORE");
dbStorageConfiguration.setJdbcDriverClassName(getJDBCClassName());
dbStorageConfiguration.setJdbcLockAcquisitionTimeoutMillis(getJdbcLockAcquisitionTimeoutMillis());
dbStorageConfiguration.setJdbcLockExpirationMillis(getJdbcLockExpirationMillis());
dbStorageConfiguration.setJdbcLockRenewPeriodMillis(getJdbcLockRenewPeriodMillis());
return dbStorageConfiguration;
}
configuration.setStoreConfiguration(dbStorageConfiguration);
protected long getJdbcLockAcquisitionTimeoutMillis() {
return Long.getLong("jdbc.lock.acquisition", ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis());
}
protected long getJdbcLockExpirationMillis() {
return Long.getLong("jdbc.lock.expiration", ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis());
}
protected long getJdbcLockRenewPeriodMillis() {
return Long.getLong("jdbc.lock.renew", ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis());
}
public void destroyTables(List<String> tableNames) throws Exception {

View File

@ -112,7 +112,7 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
liveServer.setIdentity(this.getClass().getSimpleName() + "/liveServer");
}
protected TestableServer createTestableServer(Configuration config) {
protected TestableServer createTestableServer(Configuration config) throws Exception {
boolean isBackup = config.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration || config.getHAPolicyConfiguration() instanceof SharedStoreSlavePolicyConfiguration;
return new SameProcessActiveMQServer(createInVMFailoverServer(true, config, nodeManager, isBackup ? 2 : 1));
}
@ -156,7 +156,7 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
/**
* Override this if is needed a different implementation of {@link NodeManager} to be used into {@link #createConfigs()}.
*/
protected NodeManager createNodeManager() {
protected NodeManager createNodeManager() throws Exception {
return new InVMNodeManager(false);
}

View File

@ -16,16 +16,16 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.failover;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@ -34,13 +34,17 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.hamcrest.core.Is;
import org.junit.After;
@ -50,18 +54,9 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.DERBY;
@RunWith(Parameterized.class)
public class NettyFailoverTest extends FailoverTest {
private static final long JDBC_LOCK_EXPIRATION_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis();
private static final long JDBC_LOCK_RENEW_PERIOD_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis();
private static final long JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis();
private static final SQLProvider SQL_PROVIDER = new PropertySQLProvider.Factory(DERBY).create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true";
private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
public enum NodeManagerType {
InVM, Jdbc
}
@ -84,8 +79,8 @@ public class NettyFailoverTest extends FailoverTest {
return getNettyConnectorTransportConfiguration(live);
}
private ScheduledExecutorService scheduledExecutorService;
private ExecutorService executor;
private List<ScheduledExecutorService> scheduledExecutorServices = new ArrayList<>();
private List<ExecutorService> executors = new ArrayList<>();
@Override
protected NodeManager createReplicatedBackupNodeManager(Configuration backupConfig) {
@ -94,23 +89,25 @@ public class NettyFailoverTest extends FailoverTest {
}
@Override
protected NodeManager createNodeManager() {
protected NodeManager createNodeManager() throws Exception {
switch (nodeManagerType) {
case InVM:
return new InVMNodeManager(false);
case Jdbc:
//It can uses an in memory JavaDB: the failover tests are in process
final ThreadFactory daemonThreadFactory = t -> {
final Thread th = new Thread(t);
th.setDaemon(true);
return th;
};
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory);
executor = Executors.newFixedThreadPool(2, daemonThreadFactory);
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory);
scheduledExecutorServices.add(scheduledExecutorService);
final ExecutorService executor = Executors.newFixedThreadPool(2, daemonThreadFactory);
executors.add(executor);
final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration();
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
return JdbcNodeManager.usingConnectionUrl(UUID.randomUUID().toString(), JDBC_LOCK_EXPIRATION_MILLIS, JDBC_LOCK_RENEW_PERIOD_MILLIS, JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS, JDBC_URL, DRIVER_CLASS_NAME, SQL_PROVIDER, scheduledExecutorService, executorFactory, (code, message, file) -> {
return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory, (code, message, file) -> {
code.printStackTrace();
Assert.fail(message);
});
@ -119,13 +116,27 @@ public class NettyFailoverTest extends FailoverTest {
}
}
@Override
protected TestableServer createTestableServer(Configuration config) throws Exception {
final boolean isBackup = config.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration || config.getHAPolicyConfiguration() instanceof SharedStoreSlavePolicyConfiguration;
NodeManager nodeManager = this.nodeManager;
//create a separate NodeManager for the backup
if (isBackup && nodeManagerType == NodeManagerType.Jdbc) {
nodeManager = createNodeManager();
}
return new SameProcessActiveMQServer(createInVMFailoverServer(true, config, nodeManager, isBackup ? 2 : 1));
}
@After
public void shutDownExecutors() {
if (scheduledExecutorService != null) {
executor.shutdown();
scheduledExecutorService.shutdown();
this.executor = null;
this.scheduledExecutorService = null;
if (!scheduledExecutorServices.isEmpty()) {
ThreadLeakCheckRule.addKownThread("oracle.jdbc.driver.BlockSource.ThreadedCachingBlockSource.BlockReleaser");
executors.forEach(ExecutorService::shutdown);
scheduledExecutorServices.forEach(ExecutorService::shutdown);
executors.clear();
scheduledExecutorServices.clear();
}
}