This commit is contained in:
Francesco Nigro 2018-01-30 14:07:16 +01:00
commit 8fb3c912c0
5 changed files with 137 additions and 51 deletions

View File

@ -44,9 +44,12 @@ public final class JdbcNodeManager extends NodeManager {
private static final Logger logger = Logger.getLogger(JdbcNodeManager.class);
private static final long MAX_PAUSE_MILLIS = 2000L;
private final SharedStateManager sharedStateManager;
private final ScheduledLeaseLock scheduledLiveLock;
private final ScheduledLeaseLock scheduledBackupLock;
private final Supplier<? extends SharedStateManager> sharedStateManagerFactory;
private final Supplier<? extends ScheduledLeaseLock> scheduledLiveLockFactory;
private final Supplier<? extends ScheduledLeaseLock> scheduledBackupLockFactory;
private SharedStateManager sharedStateManager;
private ScheduledLeaseLock scheduledLiveLock;
private ScheduledLeaseLock scheduledBackupLock;
private final long lockRenewPeriodMillis;
private final long lockAcquisitionTimeoutMillis;
private volatile boolean interrupted = false;
@ -82,7 +85,14 @@ public final class JdbcNodeManager extends NodeManager {
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
return new JdbcNodeManager(JdbcSharedStateManager.usingDataSource(brokerId, lockExpirationMillis, dataSource, provider), false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, executorFactory, ioCriticalErrorListener);
return new JdbcNodeManager(
() -> JdbcSharedStateManager.usingDataSource(brokerId, lockExpirationMillis, dataSource, provider),
false,
lockRenewPeriodMillis,
lockAcquisitionTimeoutMillis,
scheduledExecutorService,
executorFactory,
ioCriticalErrorListener);
}
public static JdbcNodeManager usingConnectionUrl(String brokerId,
@ -95,10 +105,17 @@ public final class JdbcNodeManager extends NodeManager {
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
return new JdbcNodeManager(JdbcSharedStateManager.usingConnectionUrl(brokerId, lockExpirationMillis, jdbcUrl, driverClass, provider), false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, executorFactory, ioCriticalErrorListener);
return new JdbcNodeManager(
() -> JdbcSharedStateManager.usingConnectionUrl(brokerId, lockExpirationMillis, jdbcUrl, driverClass, provider),
false,
lockRenewPeriodMillis,
lockAcquisitionTimeoutMillis,
scheduledExecutorService,
executorFactory,
ioCriticalErrorListener);
}
private JdbcNodeManager(final SharedStateManager sharedStateManager,
private JdbcNodeManager(Supplier<? extends SharedStateManager> sharedStateManagerFactory,
boolean replicatedBackup,
long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis,
@ -109,10 +126,26 @@ public final class JdbcNodeManager extends NodeManager {
this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis;
this.lockRenewPeriodMillis = lockRenewPeriodMillis;
this.pauser = LeaseLock.Pauser.sleep(Math.min(this.lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS);
this.sharedStateManager = sharedStateManager;
this.scheduledLiveLock = ScheduledLeaseLock.of(scheduledExecutorService, executorFactory != null ? executorFactory.getExecutor() : null, "live", this.sharedStateManager.liveLock(), lockRenewPeriodMillis, ioCriticalErrorListener);
this.scheduledBackupLock = ScheduledLeaseLock.of(scheduledExecutorService, executorFactory != null ? executorFactory.getExecutor() : null, "backup", this.sharedStateManager.backupLock(), lockRenewPeriodMillis, ioCriticalErrorListener);
this.sharedStateManagerFactory = sharedStateManagerFactory;
this.scheduledLiveLockFactory = () -> ScheduledLeaseLock.of(
scheduledExecutorService,
executorFactory != null ? executorFactory.getExecutor() : null,
"live",
this.sharedStateManager.liveLock(),
lockRenewPeriodMillis,
ioCriticalErrorListener);
this.scheduledBackupLockFactory = () -> ScheduledLeaseLock.of(
scheduledExecutorService,
executorFactory != null ?
executorFactory.getExecutor() : null,
"backup",
this.sharedStateManager.backupLock(),
lockRenewPeriodMillis,
ioCriticalErrorListener);
this.ioCriticalErrorListener = ioCriticalErrorListener;
this.sharedStateManager = null;
this.scheduledLiveLock = null;
this.scheduledBackupLock = null;
}
@Override
@ -122,13 +155,19 @@ public final class JdbcNodeManager extends NodeManager {
if (isStarted()) {
return;
}
this.sharedStateManager = sharedStateManagerFactory.get();
if (!replicatedBackup) {
final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
setUUID(nodeId);
}
this.scheduledLiveLock = scheduledLiveLockFactory.get();
this.scheduledBackupLock = scheduledBackupLockFactory.get();
super.start();
}
} catch (IllegalStateException e) {
this.sharedStateManager = null;
this.scheduledLiveLock = null;
this.scheduledBackupLock = null;
if (this.ioCriticalErrorListener != null) {
this.ioCriticalErrorListener.onIOException(e, "Failed to setup the JdbcNodeManager", null);
}
@ -145,6 +184,9 @@ public final class JdbcNodeManager extends NodeManager {
} finally {
super.stop();
this.sharedStateManager.close();
this.sharedStateManager = null;
this.scheduledLiveLock = null;
this.scheduledBackupLock = null;
}
}
}

View File

@ -25,31 +25,34 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.DERBY;
public class JdbcLeaseLockTest extends ActiveMQTestBase {
public class JdbcLeaseLockTest {
private static final long DEFAULT_LOCK_EXPIRATION_MILLIS = TimeUnit.SECONDS.toMillis(10);
private static final SQLProvider SQL_PROVIDER = new PropertySQLProvider.Factory(DERBY).create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true";
private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
private JdbcSharedStateManager jdbcSharedStateManager;
private DatabaseStorageConfiguration dbConf;
private SQLProvider sqlProvider;
private LeaseLock lock() {
return lock(DEFAULT_LOCK_EXPIRATION_MILLIS);
return lock(dbConf.getJdbcLockExpirationMillis());
}
private LeaseLock lock(long acquireMillis) {
try {
return JdbcSharedStateManager.createLiveLock(UUID.randomUUID().toString(), jdbcSharedStateManager.getConnection(), SQL_PROVIDER, acquireMillis, 0);
return JdbcSharedStateManager
.createLiveLock(
UUID.randomUUID().toString(),
jdbcSharedStateManager.getConnection(),
sqlProvider,
acquireMillis,
0);
} catch (SQLException e) {
throw new IllegalStateException(e);
}
@ -57,7 +60,18 @@ public class JdbcLeaseLockTest {
@Before
public void createLockTable() {
jdbcSharedStateManager = JdbcSharedStateManager.usingConnectionUrl(UUID.randomUUID().toString(), DEFAULT_LOCK_EXPIRATION_MILLIS, JDBC_URL, DRIVER_CLASS_NAME, SQL_PROVIDER);
dbConf = createDefaultDatabaseStorageConfiguration();
sqlProvider = JDBCUtils.getSQLProvider(
dbConf.getJdbcDriverClassName(),
dbConf.getNodeManagerStoreTableName(),
SQLProvider.DatabaseStoreType.NODE_MANAGER);
jdbcSharedStateManager = JdbcSharedStateManager
.usingConnectionUrl(
UUID.randomUUID().toString(),
dbConf.getJdbcLockExpirationMillis(),
dbConf.getJdbcConnectionUrl(),
dbConf.getJdbcDriverClassName(),
sqlProvider);
}
@After

View File

@ -60,6 +60,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@ -137,11 +138,11 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.CleanupSystemPropertiesRule;
import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.ThreadDumpUtil;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.ThreadDumpUtil;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
@ -466,6 +467,10 @@ public abstract class ActiveMQTestBase extends Assert {
}
protected void setDBStoreType(Configuration configuration) {
configuration.setStoreConfiguration(createDefaultDatabaseStorageConfiguration());
}
protected DatabaseStorageConfiguration createDefaultDatabaseStorageConfiguration() {
DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration();
dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl());
dbStorageConfiguration.setBindingsTableName("BINDINGS");
@ -473,8 +478,22 @@ public abstract class ActiveMQTestBase extends Assert {
dbStorageConfiguration.setLargeMessageTableName("LARGE_MESSAGE");
dbStorageConfiguration.setPageStoreTableName("PAGE_STORE");
dbStorageConfiguration.setJdbcDriverClassName(getJDBCClassName());
dbStorageConfiguration.setJdbcLockAcquisitionTimeoutMillis(getJdbcLockAcquisitionTimeoutMillis());
dbStorageConfiguration.setJdbcLockExpirationMillis(getJdbcLockExpirationMillis());
dbStorageConfiguration.setJdbcLockRenewPeriodMillis(getJdbcLockRenewPeriodMillis());
return dbStorageConfiguration;
}
configuration.setStoreConfiguration(dbStorageConfiguration);
protected long getJdbcLockAcquisitionTimeoutMillis() {
return Long.getLong("jdbc.lock.acquisition", ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis());
}
protected long getJdbcLockExpirationMillis() {
return Long.getLong("jdbc.lock.expiration", ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis());
}
protected long getJdbcLockRenewPeriodMillis() {
return Long.getLong("jdbc.lock.renew", ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis());
}
public void destroyTables(List<String> tableNames) throws Exception {

View File

@ -112,7 +112,7 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
liveServer.setIdentity(this.getClass().getSimpleName() + "/liveServer");
}
protected TestableServer createTestableServer(Configuration config) {
protected TestableServer createTestableServer(Configuration config) throws Exception {
boolean isBackup = config.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration || config.getHAPolicyConfiguration() instanceof SharedStoreSlavePolicyConfiguration;
return new SameProcessActiveMQServer(createInVMFailoverServer(true, config, nodeManager, isBackup ? 2 : 1));
}
@ -156,7 +156,7 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
/**
* Override this if is needed a different implementation of {@link NodeManager} to be used into {@link #createConfigs()}.
*/
protected NodeManager createNodeManager() {
protected NodeManager createNodeManager() throws Exception {
return new InVMNodeManager(false);
}

View File

@ -16,16 +16,16 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.failover;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@ -34,13 +34,17 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.hamcrest.core.Is;
import org.junit.After;
@ -50,18 +54,9 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.DERBY;
@RunWith(Parameterized.class)
public class NettyFailoverTest extends FailoverTest {
private static final long JDBC_LOCK_EXPIRATION_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis();
private static final long JDBC_LOCK_RENEW_PERIOD_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis();
private static final long JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis();
private static final SQLProvider SQL_PROVIDER = new PropertySQLProvider.Factory(DERBY).create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true";
private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
public enum NodeManagerType {
InVM, Jdbc
}
@ -84,8 +79,8 @@ public class NettyFailoverTest extends FailoverTest {
return getNettyConnectorTransportConfiguration(live);
}
private ScheduledExecutorService scheduledExecutorService;
private ExecutorService executor;
private List<ScheduledExecutorService> scheduledExecutorServices = new ArrayList<>();
private List<ExecutorService> executors = new ArrayList<>();
@Override
protected NodeManager createReplicatedBackupNodeManager(Configuration backupConfig) {
@ -94,23 +89,25 @@ public class NettyFailoverTest extends FailoverTest {
}
@Override
protected NodeManager createNodeManager() {
protected NodeManager createNodeManager() throws Exception {
switch (nodeManagerType) {
case InVM:
return new InVMNodeManager(false);
case Jdbc:
//It can uses an in memory JavaDB: the failover tests are in process
final ThreadFactory daemonThreadFactory = t -> {
final Thread th = new Thread(t);
th.setDaemon(true);
return th;
};
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory);
executor = Executors.newFixedThreadPool(2, daemonThreadFactory);
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory);
scheduledExecutorServices.add(scheduledExecutorService);
final ExecutorService executor = Executors.newFixedThreadPool(2, daemonThreadFactory);
executors.add(executor);
final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration();
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
return JdbcNodeManager.usingConnectionUrl(UUID.randomUUID().toString(), JDBC_LOCK_EXPIRATION_MILLIS, JDBC_LOCK_RENEW_PERIOD_MILLIS, JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS, JDBC_URL, DRIVER_CLASS_NAME, SQL_PROVIDER, scheduledExecutorService, executorFactory, (code, message, file) -> {
return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory, (code, message, file) -> {
code.printStackTrace();
Assert.fail(message);
});
@ -119,13 +116,27 @@ public class NettyFailoverTest extends FailoverTest {
}
}
@Override
protected TestableServer createTestableServer(Configuration config) throws Exception {
final boolean isBackup = config.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration || config.getHAPolicyConfiguration() instanceof SharedStoreSlavePolicyConfiguration;
NodeManager nodeManager = this.nodeManager;
//create a separate NodeManager for the backup
if (isBackup && nodeManagerType == NodeManagerType.Jdbc) {
nodeManager = createNodeManager();
}
return new SameProcessActiveMQServer(createInVMFailoverServer(true, config, nodeManager, isBackup ? 2 : 1));
}
@After
public void shutDownExecutors() {
if (scheduledExecutorService != null) {
executor.shutdown();
scheduledExecutorService.shutdown();
this.executor = null;
this.scheduledExecutorService = null;
if (!scheduledExecutorServices.isEmpty()) {
ThreadLeakCheckRule.addKownThread("oracle.jdbc.driver.BlockSource.ThreadedCachingBlockSource.BlockReleaser");
executors.forEach(ExecutorService::shutdown);
scheduledExecutorServices.forEach(ExecutorService::shutdown);
executors.clear();
scheduledExecutorServices.clear();
}
}