ARTEMIS-1509 Add support for JdbcNodeManager into the NettyFailoverTest
This commit is contained in:
parent
d03c4c8cc7
commit
8e8a6f0faf
|
@ -111,7 +111,9 @@ public class FailoverTest extends FailoverTestBase {
|
|||
public void testTimeoutOnFailover() throws Exception {
|
||||
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
|
||||
|
||||
((InVMNodeManager) nodeManager).failoverPause = 500;
|
||||
if (nodeManager instanceof InVMNodeManager) {
|
||||
((InVMNodeManager) nodeManager).failoverPause = 500L;
|
||||
}
|
||||
|
||||
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
|
||||
|
||||
|
@ -176,7 +178,9 @@ public class FailoverTest extends FailoverTestBase {
|
|||
public void testTimeoutOnFailoverConsume() throws Exception {
|
||||
locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(300).setRetryInterval(100).setAckBatchSize(0);
|
||||
|
||||
((InVMNodeManager) nodeManager).failoverPause = 5000L;
|
||||
if (nodeManager instanceof InVMNodeManager) {
|
||||
((InVMNodeManager) nodeManager).failoverPause = 5000L;
|
||||
}
|
||||
|
||||
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
|
||||
|
||||
|
@ -237,7 +241,9 @@ public class FailoverTest extends FailoverTestBase {
|
|||
public void testTimeoutOnFailoverConsumeBlocked() throws Exception {
|
||||
locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setConsumerWindowSize(0).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(-1).setAckBatchSize(0);
|
||||
|
||||
((InVMNodeManager) nodeManager).failoverPause = 5000L;
|
||||
if (nodeManager instanceof InVMNodeManager) {
|
||||
((InVMNodeManager) nodeManager).failoverPause = 5000L;
|
||||
}
|
||||
|
||||
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
|
||||
|
||||
|
@ -330,7 +336,9 @@ public class FailoverTest extends FailoverTestBase {
|
|||
public void testTimeoutOnFailoverTransactionCommit() throws Exception {
|
||||
locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
|
||||
|
||||
((InVMNodeManager) nodeManager).failoverPause = 5000L;
|
||||
if (nodeManager instanceof InVMNodeManager) {
|
||||
((InVMNodeManager) nodeManager).failoverPause = 5000L;
|
||||
}
|
||||
|
||||
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
|
||||
|
||||
|
@ -397,12 +405,13 @@ public class FailoverTest extends FailoverTestBase {
|
|||
public void testTimeoutOnFailoverTransactionCommitTimeoutCommunication() throws Exception {
|
||||
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(500);
|
||||
|
||||
((InVMNodeManager) nodeManager).failoverPause = 6000L;
|
||||
if (nodeManager instanceof InVMNodeManager) {
|
||||
((InVMNodeManager) nodeManager).failoverPause = 6000L;
|
||||
}
|
||||
|
||||
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
|
||||
final ClientSession session = createSession(sf1, false, false, false);
|
||||
|
||||
|
||||
session.createQueue(FailoverTestBase.ADDRESS, RoutingType.MULTICAST, FailoverTestBase.ADDRESS, null, true);
|
||||
|
||||
final CountDownLatch connectionFailed = new CountDownLatch(1);
|
||||
|
@ -474,7 +483,9 @@ public class FailoverTest extends FailoverTestBase {
|
|||
public void testTimeoutOnFailoverTransactionRollback() throws Exception {
|
||||
locator.setCallTimeout(2000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
|
||||
|
||||
((InVMNodeManager) nodeManager).failoverPause = 5000L;
|
||||
if (nodeManager instanceof InVMNodeManager) {
|
||||
((InVMNodeManager) nodeManager).failoverPause = 5000L;
|
||||
}
|
||||
|
||||
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
|
||||
|
||||
|
|
|
@ -153,8 +153,15 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Override this if is needed a different implementation of {@link NodeManager} to be used into {@link #createConfigs()}.
|
||||
*/
|
||||
protected NodeManager createNodeManager() {
|
||||
return new InVMNodeManager(false);
|
||||
}
|
||||
|
||||
protected void createConfigs() throws Exception {
|
||||
nodeManager = new InVMNodeManager(false);
|
||||
nodeManager = createNodeManager();
|
||||
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
|
||||
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
|
||||
|
||||
|
@ -167,6 +174,14 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
|
|||
liveServer = createTestableServer(liveConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* Override this if is needed a different implementation of {@link NodeManager} to be used into {@link #createReplicatedConfigs()}.
|
||||
*/
|
||||
protected NodeManager createReplicatedBackupNodeManager(Configuration backupConfig) {
|
||||
return new InVMNodeManager(true, backupConfig.getJournalLocation());
|
||||
}
|
||||
|
||||
|
||||
protected void createReplicatedConfigs() throws Exception {
|
||||
final TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
|
||||
final TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
|
||||
|
@ -180,7 +195,7 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
|
|||
backupConfig.setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)).setSecurityEnabled(false);
|
||||
|
||||
setupHAPolicyConfiguration();
|
||||
nodeManager = new InVMNodeManager(true, backupConfig.getJournalLocation());
|
||||
nodeManager = createReplicatedBackupNodeManager(backupConfig);
|
||||
|
||||
backupServer = createTestableServer(backupConfig);
|
||||
|
||||
|
|
|
@ -16,9 +16,16 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.cluster.failover;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
|
@ -26,12 +33,45 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||
import org.apache.activemq.artemis.core.server.NodeManager;
|
||||
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
|
||||
import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||
import org.hamcrest.core.Is;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class NettyFailoverTest extends FailoverTest {
|
||||
|
||||
private static final long JDBC_LOCK_EXPIRATION_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis();
|
||||
private static final long JDBC_LOCK_RENEW_PERIOD_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis();
|
||||
private static final long JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis();
|
||||
private static final SQLProvider SQL_PROVIDER = new DerbySQLProvider.Factory().create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
|
||||
private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true";
|
||||
private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
|
||||
|
||||
public enum NodeManagerType {
|
||||
InVM, Jdbc
|
||||
}
|
||||
|
||||
@Parameterized.Parameters(name = "{0} Node Manager")
|
||||
public static Iterable<? extends Object> nodeManagerTypes() {
|
||||
return Arrays.asList(new Object[][]{{NodeManagerType.Jdbc}, {NodeManagerType.InVM}});
|
||||
}
|
||||
|
||||
@Parameterized.Parameter
|
||||
public NodeManagerType nodeManagerType;
|
||||
|
||||
@Override
|
||||
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
|
||||
return getNettyAcceptorTransportConfiguration(live);
|
||||
|
@ -42,6 +82,51 @@ public class NettyFailoverTest extends FailoverTest {
|
|||
return getNettyConnectorTransportConfiguration(live);
|
||||
}
|
||||
|
||||
private ScheduledExecutorService scheduledExecutorService;
|
||||
private ExecutorService executor;
|
||||
|
||||
@Override
|
||||
protected NodeManager createReplicatedBackupNodeManager(Configuration backupConfig) {
|
||||
Assume.assumeThat("Replicated backup is supported only by " + NodeManagerType.InVM + " Node Manager", nodeManagerType, Is.is(NodeManagerType.InVM));
|
||||
return super.createReplicatedBackupNodeManager(backupConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NodeManager createNodeManager() {
|
||||
|
||||
switch (nodeManagerType) {
|
||||
|
||||
case InVM:
|
||||
return new InVMNodeManager(false);
|
||||
case Jdbc:
|
||||
//It can uses an in memory JavaDB: the failover tests are in process
|
||||
final ThreadFactory daemonThreadFactory = t -> {
|
||||
final Thread th = new Thread(t);
|
||||
th.setDaemon(true);
|
||||
return th;
|
||||
};
|
||||
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory);
|
||||
executor = Executors.newFixedThreadPool(2, daemonThreadFactory);
|
||||
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
|
||||
return JdbcNodeManager.usingConnectionUrl(UUID.randomUUID().toString(), JDBC_LOCK_EXPIRATION_MILLIS, JDBC_LOCK_RENEW_PERIOD_MILLIS, JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS, JDBC_URL, DRIVER_CLASS_NAME, SQL_PROVIDER, scheduledExecutorService, executorFactory, (code, message, file) -> {
|
||||
code.printStackTrace();
|
||||
Assert.fail(message);
|
||||
});
|
||||
default:
|
||||
throw new AssertionError("enum type not supported!");
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void shutDownExecutors() {
|
||||
if (scheduledExecutorService != null) {
|
||||
executor.shutdown();
|
||||
scheduledExecutorService.shutdown();
|
||||
this.executor = null;
|
||||
this.scheduledExecutorService = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 120000)
|
||||
public void testFailoverWithHostAlias() throws Exception {
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
|
|
Loading…
Reference in New Issue