From a0f3da5d7208918fb2f45352d6f21996ac2ae935 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Tue, 7 Nov 2017 17:44:57 +0100 Subject: [PATCH] ARTEMIS-1509 Add support for JdbcNodeManager into the NettyFailoverTest (cherry picked from commit 8e8a6f0faf121a03b07b1baa23f5d91ec1cecdc3) --- .../cluster/failover/FailoverTest.java | 24 +++-- .../cluster/failover/FailoverTestBase.java | 19 +++- .../cluster/failover/NettyFailoverTest.java | 87 ++++++++++++++++++- 3 files changed, 121 insertions(+), 9 deletions(-) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java index 72b5ac473f..6a592c2170 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java @@ -110,7 +110,9 @@ public class FailoverTest extends FailoverTestBase { public void testTimeoutOnFailover() throws Exception { locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100); - ((InVMNodeManager) nodeManager).failoverPause = 500; + if (nodeManager instanceof InVMNodeManager) { + ((InVMNodeManager) nodeManager).failoverPause = 500L; + } ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator); @@ -175,7 +177,9 @@ public class FailoverTest extends FailoverTestBase { public void testTimeoutOnFailoverConsume() throws Exception { locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(300).setRetryInterval(100).setAckBatchSize(0); - ((InVMNodeManager) nodeManager).failoverPause = 5000L; + if (nodeManager instanceof InVMNodeManager) { + ((InVMNodeManager) nodeManager).failoverPause = 5000L; + } ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator); @@ -236,7 +240,9 @@ public class FailoverTest extends FailoverTestBase { public void testTimeoutOnFailoverConsumeBlocked() throws Exception { locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setConsumerWindowSize(0).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(-1).setAckBatchSize(0); - ((InVMNodeManager) nodeManager).failoverPause = 5000L; + if (nodeManager instanceof InVMNodeManager) { + ((InVMNodeManager) nodeManager).failoverPause = 5000L; + } ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator); @@ -329,7 +335,9 @@ public class FailoverTest extends FailoverTestBase { public void testTimeoutOnFailoverTransactionCommit() throws Exception { locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100); - ((InVMNodeManager) nodeManager).failoverPause = 5000L; + if (nodeManager instanceof InVMNodeManager) { + ((InVMNodeManager) nodeManager).failoverPause = 5000L; + } ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator); @@ -396,7 +404,9 @@ public class FailoverTest extends FailoverTestBase { public void testTimeoutOnFailoverTransactionCommitTimeoutCommunication() throws Exception { locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(500); - ((InVMNodeManager) nodeManager).failoverPause = 6000L; + if (nodeManager instanceof InVMNodeManager) { + ((InVMNodeManager) nodeManager).failoverPause = 6000L; + } ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator); final ClientSession session = createSession(sf1, false, false, false); @@ -473,7 +483,9 @@ public class FailoverTest extends FailoverTestBase { public void testTimeoutOnFailoverTransactionRollback() throws Exception { locator.setCallTimeout(2000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100); - ((InVMNodeManager) nodeManager).failoverPause = 5000L; + if (nodeManager instanceof InVMNodeManager) { + ((InVMNodeManager) nodeManager).failoverPause = 5000L; + } ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java index b08ceb16f4..ec97663dd7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java @@ -153,8 +153,15 @@ public abstract class FailoverTestBase extends ActiveMQTestBase { } } + /** + * Override this if is needed a different implementation of {@link NodeManager} to be used into {@link #createConfigs()}. + */ + protected NodeManager createNodeManager() { + return new InVMNodeManager(false); + } + protected void createConfigs() throws Exception { - nodeManager = new InVMNodeManager(false); + nodeManager = createNodeManager(); TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); @@ -167,6 +174,14 @@ public abstract class FailoverTestBase extends ActiveMQTestBase { liveServer = createTestableServer(liveConfig); } + /** + * Override this if is needed a different implementation of {@link NodeManager} to be used into {@link #createReplicatedConfigs()}. + */ + protected NodeManager createReplicatedBackupNodeManager(Configuration backupConfig) { + return new InVMNodeManager(true, backupConfig.getJournalLocation()); + } + + protected void createReplicatedConfigs() throws Exception { final TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); final TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); @@ -180,7 +195,7 @@ public abstract class FailoverTestBase extends ActiveMQTestBase { backupConfig.setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)).setSecurityEnabled(false); setupHAPolicyConfiguration(); - nodeManager = new InVMNodeManager(true, backupConfig.getJournalLocation()); + nodeManager = createReplicatedBackupNodeManager(backupConfig); backupServer = createTestableServer(backupConfig); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java index 0c8d1161a3..d5b7292395 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java @@ -16,9 +16,16 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.failover; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientConsumer; @@ -26,12 +33,45 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; +import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.server.NodeManager; +import org.apache.activemq.artemis.core.server.impl.InVMNodeManager; +import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager; +import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; +import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.hamcrest.core.Is; +import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class NettyFailoverTest extends FailoverTest { + private static final long JDBC_LOCK_EXPIRATION_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis(); + private static final long JDBC_LOCK_RENEW_PERIOD_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis(); + private static final long JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis(); + private static final SQLProvider SQL_PROVIDER = new DerbySQLProvider.Factory().create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER); + private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true"; + private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; + + public enum NodeManagerType { + InVM, Jdbc + } + + @Parameterized.Parameters(name = "{0} Node Manager") + public static Iterable nodeManagerTypes() { + return Arrays.asList(new Object[][]{{NodeManagerType.Jdbc}, {NodeManagerType.InVM}}); + } + + @Parameterized.Parameter + public NodeManagerType nodeManagerType; + @Override protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { return getNettyAcceptorTransportConfiguration(live); @@ -42,7 +82,52 @@ public class NettyFailoverTest extends FailoverTest { return getNettyConnectorTransportConfiguration(live); } - @Test + private ScheduledExecutorService scheduledExecutorService; + private ExecutorService executor; + + @Override + protected NodeManager createReplicatedBackupNodeManager(Configuration backupConfig) { + Assume.assumeThat("Replicated backup is supported only by " + NodeManagerType.InVM + " Node Manager", nodeManagerType, Is.is(NodeManagerType.InVM)); + return super.createReplicatedBackupNodeManager(backupConfig); + } + + @Override + protected NodeManager createNodeManager() { + + switch (nodeManagerType) { + + case InVM: + return new InVMNodeManager(false); + case Jdbc: + //It can uses an in memory JavaDB: the failover tests are in process + final ThreadFactory daemonThreadFactory = t -> { + final Thread th = new Thread(t); + th.setDaemon(true); + return th; + }; + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory); + executor = Executors.newFixedThreadPool(2, daemonThreadFactory); + final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor); + return JdbcNodeManager.usingConnectionUrl(UUID.randomUUID().toString(), JDBC_LOCK_EXPIRATION_MILLIS, JDBC_LOCK_RENEW_PERIOD_MILLIS, JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS, JDBC_URL, DRIVER_CLASS_NAME, SQL_PROVIDER, scheduledExecutorService, executorFactory, (code, message, file) -> { + code.printStackTrace(); + Assert.fail(message); + }); + default: + throw new AssertionError("enum type not supported!"); + } + } + + @After + public void shutDownExecutors() { + if (scheduledExecutorService != null) { + executor.shutdown(); + scheduledExecutorService.shutdown(); + this.executor = null; + this.scheduledExecutorService = null; + } + } + + @Test(timeout = 120000) public void testFailoverWithHostAlias() throws Exception { Map params = new HashMap<>(); params.put(TransportConstants.HOST_PROP_NAME, "127.0.0.1");