diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java index b2d9d3fc2b..2360df6346 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java @@ -44,9 +44,12 @@ public final class JdbcNodeManager extends NodeManager { private static final Logger logger = Logger.getLogger(JdbcNodeManager.class); private static final long MAX_PAUSE_MILLIS = 2000L; - private final SharedStateManager sharedStateManager; - private final ScheduledLeaseLock scheduledLiveLock; - private final ScheduledLeaseLock scheduledBackupLock; + private final Supplier sharedStateManagerFactory; + private final Supplier scheduledLiveLockFactory; + private final Supplier scheduledBackupLockFactory; + private SharedStateManager sharedStateManager; + private ScheduledLeaseLock scheduledLiveLock; + private ScheduledLeaseLock scheduledBackupLock; private final long lockRenewPeriodMillis; private final long lockAcquisitionTimeoutMillis; private volatile boolean interrupted = false; @@ -82,7 +85,14 @@ public final class JdbcNodeManager extends NodeManager { ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory, IOCriticalErrorListener ioCriticalErrorListener) { - return new JdbcNodeManager(JdbcSharedStateManager.usingDataSource(brokerId, lockExpirationMillis, dataSource, provider), false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, executorFactory, ioCriticalErrorListener); + return new JdbcNodeManager( + () -> JdbcSharedStateManager.usingDataSource(brokerId, lockExpirationMillis, dataSource, provider), + false, + lockRenewPeriodMillis, + lockAcquisitionTimeoutMillis, + scheduledExecutorService, + executorFactory, + ioCriticalErrorListener); } public static JdbcNodeManager usingConnectionUrl(String brokerId, @@ -95,10 +105,17 @@ public final class JdbcNodeManager extends NodeManager { ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory, IOCriticalErrorListener ioCriticalErrorListener) { - return new JdbcNodeManager(JdbcSharedStateManager.usingConnectionUrl(brokerId, lockExpirationMillis, jdbcUrl, driverClass, provider), false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, executorFactory, ioCriticalErrorListener); + return new JdbcNodeManager( + () -> JdbcSharedStateManager.usingConnectionUrl(brokerId, lockExpirationMillis, jdbcUrl, driverClass, provider), + false, + lockRenewPeriodMillis, + lockAcquisitionTimeoutMillis, + scheduledExecutorService, + executorFactory, + ioCriticalErrorListener); } - private JdbcNodeManager(final SharedStateManager sharedStateManager, + private JdbcNodeManager(Supplier sharedStateManagerFactory, boolean replicatedBackup, long lockRenewPeriodMillis, long lockAcquisitionTimeoutMillis, @@ -109,10 +126,26 @@ public final class JdbcNodeManager extends NodeManager { this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis; this.lockRenewPeriodMillis = lockRenewPeriodMillis; this.pauser = LeaseLock.Pauser.sleep(Math.min(this.lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS); - this.sharedStateManager = sharedStateManager; - this.scheduledLiveLock = ScheduledLeaseLock.of(scheduledExecutorService, executorFactory != null ? executorFactory.getExecutor() : null, "live", this.sharedStateManager.liveLock(), lockRenewPeriodMillis, ioCriticalErrorListener); - this.scheduledBackupLock = ScheduledLeaseLock.of(scheduledExecutorService, executorFactory != null ? executorFactory.getExecutor() : null, "backup", this.sharedStateManager.backupLock(), lockRenewPeriodMillis, ioCriticalErrorListener); + this.sharedStateManagerFactory = sharedStateManagerFactory; + this.scheduledLiveLockFactory = () -> ScheduledLeaseLock.of( + scheduledExecutorService, + executorFactory != null ? executorFactory.getExecutor() : null, + "live", + this.sharedStateManager.liveLock(), + lockRenewPeriodMillis, + ioCriticalErrorListener); + this.scheduledBackupLockFactory = () -> ScheduledLeaseLock.of( + scheduledExecutorService, + executorFactory != null ? + executorFactory.getExecutor() : null, + "backup", + this.sharedStateManager.backupLock(), + lockRenewPeriodMillis, + ioCriticalErrorListener); this.ioCriticalErrorListener = ioCriticalErrorListener; + this.sharedStateManager = null; + this.scheduledLiveLock = null; + this.scheduledBackupLock = null; } @Override @@ -122,13 +155,19 @@ public final class JdbcNodeManager extends NodeManager { if (isStarted()) { return; } + this.sharedStateManager = sharedStateManagerFactory.get(); if (!replicatedBackup) { final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID); setUUID(nodeId); } + this.scheduledLiveLock = scheduledLiveLockFactory.get(); + this.scheduledBackupLock = scheduledBackupLockFactory.get(); super.start(); } } catch (IllegalStateException e) { + this.sharedStateManager = null; + this.scheduledLiveLock = null; + this.scheduledBackupLock = null; if (this.ioCriticalErrorListener != null) { this.ioCriticalErrorListener.onIOException(e, "Failed to setup the JdbcNodeManager", null); } @@ -145,6 +184,9 @@ public final class JdbcNodeManager extends NodeManager { } finally { super.stop(); this.sharedStateManager.close(); + this.sharedStateManager = null; + this.scheduledLiveLock = null; + this.scheduledBackupLock = null; } } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java index 201db6a8cb..d4b63de0df 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java @@ -25,31 +25,34 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; -import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; -import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider; +import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.DERBY; +public class JdbcLeaseLockTest extends ActiveMQTestBase { -public class JdbcLeaseLockTest { - - private static final long DEFAULT_LOCK_EXPIRATION_MILLIS = TimeUnit.SECONDS.toMillis(10); - private static final SQLProvider SQL_PROVIDER = new PropertySQLProvider.Factory(DERBY).create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER); - private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true"; - private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; private JdbcSharedStateManager jdbcSharedStateManager; + private DatabaseStorageConfiguration dbConf; + private SQLProvider sqlProvider; private LeaseLock lock() { - return lock(DEFAULT_LOCK_EXPIRATION_MILLIS); + return lock(dbConf.getJdbcLockExpirationMillis()); } private LeaseLock lock(long acquireMillis) { try { - return JdbcSharedStateManager.createLiveLock(UUID.randomUUID().toString(), jdbcSharedStateManager.getConnection(), SQL_PROVIDER, acquireMillis, 0); + return JdbcSharedStateManager + .createLiveLock( + UUID.randomUUID().toString(), + jdbcSharedStateManager.getConnection(), + sqlProvider, + acquireMillis, + 0); } catch (SQLException e) { throw new IllegalStateException(e); } @@ -57,7 +60,18 @@ public class JdbcLeaseLockTest { @Before public void createLockTable() { - jdbcSharedStateManager = JdbcSharedStateManager.usingConnectionUrl(UUID.randomUUID().toString(), DEFAULT_LOCK_EXPIRATION_MILLIS, JDBC_URL, DRIVER_CLASS_NAME, SQL_PROVIDER); + dbConf = createDefaultDatabaseStorageConfiguration(); + sqlProvider = JDBCUtils.getSQLProvider( + dbConf.getJdbcDriverClassName(), + dbConf.getNodeManagerStoreTableName(), + SQLProvider.DatabaseStoreType.NODE_MANAGER); + jdbcSharedStateManager = JdbcSharedStateManager + .usingConnectionUrl( + UUID.randomUUID().toString(), + dbConf.getJdbcLockExpirationMillis(), + dbConf.getJdbcConnectionUrl(), + dbConf.getJdbcDriverClassName(), + sqlProvider); } @After diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index ac2e406d7d..8b3b696a8d 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -60,6 +60,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -137,11 +138,11 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.CleanupSystemPropertiesRule; import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.FileUtil; -import org.apache.activemq.artemis.utils.ThreadDumpUtil; -import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.ThreadDumpUtil; import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.jboss.logging.Logger; import org.junit.After; import org.junit.Assert; @@ -466,6 +467,10 @@ public abstract class ActiveMQTestBase extends Assert { } protected void setDBStoreType(Configuration configuration) { + configuration.setStoreConfiguration(createDefaultDatabaseStorageConfiguration()); + } + + protected DatabaseStorageConfiguration createDefaultDatabaseStorageConfiguration() { DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration(); dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl()); dbStorageConfiguration.setBindingsTableName("BINDINGS"); @@ -473,8 +478,22 @@ public abstract class ActiveMQTestBase extends Assert { dbStorageConfiguration.setLargeMessageTableName("LARGE_MESSAGE"); dbStorageConfiguration.setPageStoreTableName("PAGE_STORE"); dbStorageConfiguration.setJdbcDriverClassName(getJDBCClassName()); + dbStorageConfiguration.setJdbcLockAcquisitionTimeoutMillis(getJdbcLockAcquisitionTimeoutMillis()); + dbStorageConfiguration.setJdbcLockExpirationMillis(getJdbcLockExpirationMillis()); + dbStorageConfiguration.setJdbcLockRenewPeriodMillis(getJdbcLockRenewPeriodMillis()); + return dbStorageConfiguration; + } - configuration.setStoreConfiguration(dbStorageConfiguration); + protected long getJdbcLockAcquisitionTimeoutMillis() { + return Long.getLong("jdbc.lock.acquisition", ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis()); + } + + protected long getJdbcLockExpirationMillis() { + return Long.getLong("jdbc.lock.expiration", ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis()); + } + + protected long getJdbcLockRenewPeriodMillis() { + return Long.getLong("jdbc.lock.renew", ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis()); } public void destroyTables(List tableNames) throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java index df20551129..70e625a857 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java @@ -112,7 +112,7 @@ public abstract class FailoverTestBase extends ActiveMQTestBase { liveServer.setIdentity(this.getClass().getSimpleName() + "/liveServer"); } - protected TestableServer createTestableServer(Configuration config) { + protected TestableServer createTestableServer(Configuration config) throws Exception { boolean isBackup = config.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration || config.getHAPolicyConfiguration() instanceof SharedStoreSlavePolicyConfiguration; return new SameProcessActiveMQServer(createInVMFailoverServer(true, config, nodeManager, isBackup ? 2 : 1)); } @@ -156,7 +156,7 @@ public abstract class FailoverTestBase extends ActiveMQTestBase { /** * Override this if is needed a different implementation of {@link NodeManager} to be used into {@link #createConfigs()}. */ - protected NodeManager createNodeManager() { + protected NodeManager createNodeManager() throws Exception { return new InVMNodeManager(false); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java index 84bbc45e50..b986e4ea4c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java @@ -16,16 +16,16 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.failover; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; -import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientConsumer; @@ -34,13 +34,17 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration; +import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.impl.InVMNodeManager; import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager; -import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider; -import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; +import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer; +import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer; import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.hamcrest.core.Is; import org.junit.After; @@ -50,18 +54,9 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.DERBY; - @RunWith(Parameterized.class) public class NettyFailoverTest extends FailoverTest { - private static final long JDBC_LOCK_EXPIRATION_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis(); - private static final long JDBC_LOCK_RENEW_PERIOD_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis(); - private static final long JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis(); - private static final SQLProvider SQL_PROVIDER = new PropertySQLProvider.Factory(DERBY).create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER); - private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true"; - private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; - public enum NodeManagerType { InVM, Jdbc } @@ -84,8 +79,8 @@ public class NettyFailoverTest extends FailoverTest { return getNettyConnectorTransportConfiguration(live); } - private ScheduledExecutorService scheduledExecutorService; - private ExecutorService executor; + private List scheduledExecutorServices = new ArrayList<>(); + private List executors = new ArrayList<>(); @Override protected NodeManager createReplicatedBackupNodeManager(Configuration backupConfig) { @@ -94,23 +89,25 @@ public class NettyFailoverTest extends FailoverTest { } @Override - protected NodeManager createNodeManager() { + protected NodeManager createNodeManager() throws Exception { switch (nodeManagerType) { case InVM: return new InVMNodeManager(false); case Jdbc: - //It can uses an in memory JavaDB: the failover tests are in process final ThreadFactory daemonThreadFactory = t -> { final Thread th = new Thread(t); th.setDaemon(true); return th; }; - scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory); - executor = Executors.newFixedThreadPool(2, daemonThreadFactory); + final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory); + scheduledExecutorServices.add(scheduledExecutorService); + final ExecutorService executor = Executors.newFixedThreadPool(2, daemonThreadFactory); + executors.add(executor); + final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration(); final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor); - return JdbcNodeManager.usingConnectionUrl(UUID.randomUUID().toString(), JDBC_LOCK_EXPIRATION_MILLIS, JDBC_LOCK_RENEW_PERIOD_MILLIS, JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS, JDBC_URL, DRIVER_CLASS_NAME, SQL_PROVIDER, scheduledExecutorService, executorFactory, (code, message, file) -> { + return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory, (code, message, file) -> { code.printStackTrace(); Assert.fail(message); }); @@ -119,13 +116,27 @@ public class NettyFailoverTest extends FailoverTest { } } + + @Override + protected TestableServer createTestableServer(Configuration config) throws Exception { + final boolean isBackup = config.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration || config.getHAPolicyConfiguration() instanceof SharedStoreSlavePolicyConfiguration; + NodeManager nodeManager = this.nodeManager; + //create a separate NodeManager for the backup + if (isBackup && nodeManagerType == NodeManagerType.Jdbc) { + nodeManager = createNodeManager(); + } + return new SameProcessActiveMQServer(createInVMFailoverServer(true, config, nodeManager, isBackup ? 2 : 1)); + } + + @After public void shutDownExecutors() { - if (scheduledExecutorService != null) { - executor.shutdown(); - scheduledExecutorService.shutdown(); - this.executor = null; - this.scheduledExecutorService = null; + if (!scheduledExecutorServices.isEmpty()) { + ThreadLeakCheckRule.addKownThread("oracle.jdbc.driver.BlockSource.ThreadedCachingBlockSource.BlockReleaser"); + executors.forEach(ExecutorService::shutdown); + scheduledExecutorServices.forEach(ExecutorService::shutdown); + executors.clear(); + scheduledExecutorServices.clear(); } }