From 52e594d21890824d526d57e8b2e5bbbd1aeb7162 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Fri, 26 Jan 2018 13:24:52 +0100 Subject: [PATCH] ARTEMIS-1640 JDBC NodeManager tests have to be customizable to run on different DBMS ActiveMQTestBase has been enhanced to expose the Database storage configuration and by adding specific JDBC HA configuration properties. JdbcLeaseLockTest and NettyFailoverTests have been changed in order to make use of the JDBC configuration provided by ActiveMQTestBase. JdbcNodeManager has been made restartable to allow failover tests to reuse it after a failover. --- .../server/impl/jdbc/JdbcNodeManager.java | 60 +++++++++++++++--- .../server/impl/jdbc/JdbcLeaseLockTest.java | 38 ++++++++---- .../artemis/tests/util/ActiveMQTestBase.java | 25 +++++++- .../cluster/failover/FailoverTestBase.java | 4 +- .../cluster/failover/NettyFailoverTest.java | 61 +++++++++++-------- 5 files changed, 137 insertions(+), 51 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java index b2d9d3fc2b..2360df6346 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java @@ -44,9 +44,12 @@ public final class JdbcNodeManager extends NodeManager { private static final Logger logger = Logger.getLogger(JdbcNodeManager.class); private static final long MAX_PAUSE_MILLIS = 2000L; - private final SharedStateManager sharedStateManager; - private final ScheduledLeaseLock scheduledLiveLock; - private final ScheduledLeaseLock scheduledBackupLock; + private final Supplier sharedStateManagerFactory; + private final Supplier scheduledLiveLockFactory; + private final Supplier scheduledBackupLockFactory; + private SharedStateManager sharedStateManager; + private ScheduledLeaseLock scheduledLiveLock; + private ScheduledLeaseLock scheduledBackupLock; private final long lockRenewPeriodMillis; private final long lockAcquisitionTimeoutMillis; private volatile boolean interrupted = false; @@ -82,7 +85,14 @@ public final class JdbcNodeManager extends NodeManager { ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory, IOCriticalErrorListener ioCriticalErrorListener) { - return new JdbcNodeManager(JdbcSharedStateManager.usingDataSource(brokerId, lockExpirationMillis, dataSource, provider), false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, executorFactory, ioCriticalErrorListener); + return new JdbcNodeManager( + () -> JdbcSharedStateManager.usingDataSource(brokerId, lockExpirationMillis, dataSource, provider), + false, + lockRenewPeriodMillis, + lockAcquisitionTimeoutMillis, + scheduledExecutorService, + executorFactory, + ioCriticalErrorListener); } public static JdbcNodeManager usingConnectionUrl(String brokerId, @@ -95,10 +105,17 @@ public final class JdbcNodeManager extends NodeManager { ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory, IOCriticalErrorListener ioCriticalErrorListener) { - return new JdbcNodeManager(JdbcSharedStateManager.usingConnectionUrl(brokerId, lockExpirationMillis, jdbcUrl, driverClass, provider), false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, executorFactory, ioCriticalErrorListener); + return new JdbcNodeManager( + () -> JdbcSharedStateManager.usingConnectionUrl(brokerId, lockExpirationMillis, jdbcUrl, driverClass, provider), + false, + lockRenewPeriodMillis, + lockAcquisitionTimeoutMillis, + scheduledExecutorService, + executorFactory, + ioCriticalErrorListener); } - private JdbcNodeManager(final SharedStateManager sharedStateManager, + private JdbcNodeManager(Supplier sharedStateManagerFactory, boolean replicatedBackup, long lockRenewPeriodMillis, long lockAcquisitionTimeoutMillis, @@ -109,10 +126,26 @@ public final class JdbcNodeManager extends NodeManager { this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis; this.lockRenewPeriodMillis = lockRenewPeriodMillis; this.pauser = LeaseLock.Pauser.sleep(Math.min(this.lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS); - this.sharedStateManager = sharedStateManager; - this.scheduledLiveLock = ScheduledLeaseLock.of(scheduledExecutorService, executorFactory != null ? executorFactory.getExecutor() : null, "live", this.sharedStateManager.liveLock(), lockRenewPeriodMillis, ioCriticalErrorListener); - this.scheduledBackupLock = ScheduledLeaseLock.of(scheduledExecutorService, executorFactory != null ? executorFactory.getExecutor() : null, "backup", this.sharedStateManager.backupLock(), lockRenewPeriodMillis, ioCriticalErrorListener); + this.sharedStateManagerFactory = sharedStateManagerFactory; + this.scheduledLiveLockFactory = () -> ScheduledLeaseLock.of( + scheduledExecutorService, + executorFactory != null ? executorFactory.getExecutor() : null, + "live", + this.sharedStateManager.liveLock(), + lockRenewPeriodMillis, + ioCriticalErrorListener); + this.scheduledBackupLockFactory = () -> ScheduledLeaseLock.of( + scheduledExecutorService, + executorFactory != null ? + executorFactory.getExecutor() : null, + "backup", + this.sharedStateManager.backupLock(), + lockRenewPeriodMillis, + ioCriticalErrorListener); this.ioCriticalErrorListener = ioCriticalErrorListener; + this.sharedStateManager = null; + this.scheduledLiveLock = null; + this.scheduledBackupLock = null; } @Override @@ -122,13 +155,19 @@ public final class JdbcNodeManager extends NodeManager { if (isStarted()) { return; } + this.sharedStateManager = sharedStateManagerFactory.get(); if (!replicatedBackup) { final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID); setUUID(nodeId); } + this.scheduledLiveLock = scheduledLiveLockFactory.get(); + this.scheduledBackupLock = scheduledBackupLockFactory.get(); super.start(); } } catch (IllegalStateException e) { + this.sharedStateManager = null; + this.scheduledLiveLock = null; + this.scheduledBackupLock = null; if (this.ioCriticalErrorListener != null) { this.ioCriticalErrorListener.onIOException(e, "Failed to setup the JdbcNodeManager", null); } @@ -145,6 +184,9 @@ public final class JdbcNodeManager extends NodeManager { } finally { super.stop(); this.sharedStateManager.close(); + this.sharedStateManager = null; + this.scheduledLiveLock = null; + this.scheduledBackupLock = null; } } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java index 201db6a8cb..d4b63de0df 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java @@ -25,31 +25,34 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; -import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; -import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider; +import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.DERBY; +public class JdbcLeaseLockTest extends ActiveMQTestBase { -public class JdbcLeaseLockTest { - - private static final long DEFAULT_LOCK_EXPIRATION_MILLIS = TimeUnit.SECONDS.toMillis(10); - private static final SQLProvider SQL_PROVIDER = new PropertySQLProvider.Factory(DERBY).create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER); - private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true"; - private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; private JdbcSharedStateManager jdbcSharedStateManager; + private DatabaseStorageConfiguration dbConf; + private SQLProvider sqlProvider; private LeaseLock lock() { - return lock(DEFAULT_LOCK_EXPIRATION_MILLIS); + return lock(dbConf.getJdbcLockExpirationMillis()); } private LeaseLock lock(long acquireMillis) { try { - return JdbcSharedStateManager.createLiveLock(UUID.randomUUID().toString(), jdbcSharedStateManager.getConnection(), SQL_PROVIDER, acquireMillis, 0); + return JdbcSharedStateManager + .createLiveLock( + UUID.randomUUID().toString(), + jdbcSharedStateManager.getConnection(), + sqlProvider, + acquireMillis, + 0); } catch (SQLException e) { throw new IllegalStateException(e); } @@ -57,7 +60,18 @@ public class JdbcLeaseLockTest { @Before public void createLockTable() { - jdbcSharedStateManager = JdbcSharedStateManager.usingConnectionUrl(UUID.randomUUID().toString(), DEFAULT_LOCK_EXPIRATION_MILLIS, JDBC_URL, DRIVER_CLASS_NAME, SQL_PROVIDER); + dbConf = createDefaultDatabaseStorageConfiguration(); + sqlProvider = JDBCUtils.getSQLProvider( + dbConf.getJdbcDriverClassName(), + dbConf.getNodeManagerStoreTableName(), + SQLProvider.DatabaseStoreType.NODE_MANAGER); + jdbcSharedStateManager = JdbcSharedStateManager + .usingConnectionUrl( + UUID.randomUUID().toString(), + dbConf.getJdbcLockExpirationMillis(), + dbConf.getJdbcConnectionUrl(), + dbConf.getJdbcDriverClassName(), + sqlProvider); } @After diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index ac2e406d7d..8b3b696a8d 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -60,6 +60,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -137,11 +138,11 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.CleanupSystemPropertiesRule; import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.FileUtil; -import org.apache.activemq.artemis.utils.ThreadDumpUtil; -import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.ThreadDumpUtil; import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.jboss.logging.Logger; import org.junit.After; import org.junit.Assert; @@ -466,6 +467,10 @@ public abstract class ActiveMQTestBase extends Assert { } protected void setDBStoreType(Configuration configuration) { + configuration.setStoreConfiguration(createDefaultDatabaseStorageConfiguration()); + } + + protected DatabaseStorageConfiguration createDefaultDatabaseStorageConfiguration() { DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration(); dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl()); dbStorageConfiguration.setBindingsTableName("BINDINGS"); @@ -473,8 +478,22 @@ public abstract class ActiveMQTestBase extends Assert { dbStorageConfiguration.setLargeMessageTableName("LARGE_MESSAGE"); dbStorageConfiguration.setPageStoreTableName("PAGE_STORE"); dbStorageConfiguration.setJdbcDriverClassName(getJDBCClassName()); + dbStorageConfiguration.setJdbcLockAcquisitionTimeoutMillis(getJdbcLockAcquisitionTimeoutMillis()); + dbStorageConfiguration.setJdbcLockExpirationMillis(getJdbcLockExpirationMillis()); + dbStorageConfiguration.setJdbcLockRenewPeriodMillis(getJdbcLockRenewPeriodMillis()); + return dbStorageConfiguration; + } - configuration.setStoreConfiguration(dbStorageConfiguration); + protected long getJdbcLockAcquisitionTimeoutMillis() { + return Long.getLong("jdbc.lock.acquisition", ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis()); + } + + protected long getJdbcLockExpirationMillis() { + return Long.getLong("jdbc.lock.expiration", ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis()); + } + + protected long getJdbcLockRenewPeriodMillis() { + return Long.getLong("jdbc.lock.renew", ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis()); } public void destroyTables(List tableNames) throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java index df20551129..70e625a857 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java @@ -112,7 +112,7 @@ public abstract class FailoverTestBase extends ActiveMQTestBase { liveServer.setIdentity(this.getClass().getSimpleName() + "/liveServer"); } - protected TestableServer createTestableServer(Configuration config) { + protected TestableServer createTestableServer(Configuration config) throws Exception { boolean isBackup = config.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration || config.getHAPolicyConfiguration() instanceof SharedStoreSlavePolicyConfiguration; return new SameProcessActiveMQServer(createInVMFailoverServer(true, config, nodeManager, isBackup ? 2 : 1)); } @@ -156,7 +156,7 @@ public abstract class FailoverTestBase extends ActiveMQTestBase { /** * Override this if is needed a different implementation of {@link NodeManager} to be used into {@link #createConfigs()}. */ - protected NodeManager createNodeManager() { + protected NodeManager createNodeManager() throws Exception { return new InVMNodeManager(false); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java index 84bbc45e50..b986e4ea4c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java @@ -16,16 +16,16 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.failover; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; -import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientConsumer; @@ -34,13 +34,17 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration; +import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.impl.InVMNodeManager; import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager; -import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider; -import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; +import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer; +import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer; import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.hamcrest.core.Is; import org.junit.After; @@ -50,18 +54,9 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.DERBY; - @RunWith(Parameterized.class) public class NettyFailoverTest extends FailoverTest { - private static final long JDBC_LOCK_EXPIRATION_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis(); - private static final long JDBC_LOCK_RENEW_PERIOD_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis(); - private static final long JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis(); - private static final SQLProvider SQL_PROVIDER = new PropertySQLProvider.Factory(DERBY).create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER); - private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true"; - private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; - public enum NodeManagerType { InVM, Jdbc } @@ -84,8 +79,8 @@ public class NettyFailoverTest extends FailoverTest { return getNettyConnectorTransportConfiguration(live); } - private ScheduledExecutorService scheduledExecutorService; - private ExecutorService executor; + private List scheduledExecutorServices = new ArrayList<>(); + private List executors = new ArrayList<>(); @Override protected NodeManager createReplicatedBackupNodeManager(Configuration backupConfig) { @@ -94,23 +89,25 @@ public class NettyFailoverTest extends FailoverTest { } @Override - protected NodeManager createNodeManager() { + protected NodeManager createNodeManager() throws Exception { switch (nodeManagerType) { case InVM: return new InVMNodeManager(false); case Jdbc: - //It can uses an in memory JavaDB: the failover tests are in process final ThreadFactory daemonThreadFactory = t -> { final Thread th = new Thread(t); th.setDaemon(true); return th; }; - scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory); - executor = Executors.newFixedThreadPool(2, daemonThreadFactory); + final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory); + scheduledExecutorServices.add(scheduledExecutorService); + final ExecutorService executor = Executors.newFixedThreadPool(2, daemonThreadFactory); + executors.add(executor); + final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration(); final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor); - return JdbcNodeManager.usingConnectionUrl(UUID.randomUUID().toString(), JDBC_LOCK_EXPIRATION_MILLIS, JDBC_LOCK_RENEW_PERIOD_MILLIS, JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS, JDBC_URL, DRIVER_CLASS_NAME, SQL_PROVIDER, scheduledExecutorService, executorFactory, (code, message, file) -> { + return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory, (code, message, file) -> { code.printStackTrace(); Assert.fail(message); }); @@ -119,13 +116,27 @@ public class NettyFailoverTest extends FailoverTest { } } + + @Override + protected TestableServer createTestableServer(Configuration config) throws Exception { + final boolean isBackup = config.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration || config.getHAPolicyConfiguration() instanceof SharedStoreSlavePolicyConfiguration; + NodeManager nodeManager = this.nodeManager; + //create a separate NodeManager for the backup + if (isBackup && nodeManagerType == NodeManagerType.Jdbc) { + nodeManager = createNodeManager(); + } + return new SameProcessActiveMQServer(createInVMFailoverServer(true, config, nodeManager, isBackup ? 2 : 1)); + } + + @After public void shutDownExecutors() { - if (scheduledExecutorService != null) { - executor.shutdown(); - scheduledExecutorService.shutdown(); - this.executor = null; - this.scheduledExecutorService = null; + if (!scheduledExecutorServices.isEmpty()) { + ThreadLeakCheckRule.addKownThread("oracle.jdbc.driver.BlockSource.ThreadedCachingBlockSource.BlockReleaser"); + executors.forEach(ExecutorService::shutdown); + scheduledExecutorServices.forEach(ExecutorService::shutdown); + executors.clear(); + scheduledExecutorServices.clear(); } }