ARTEMIS-1509 Add support for JdbcNodeManager into the NettyFailoverTest

(cherry picked from commit 8e8a6f0faf)
This commit is contained in:
Francesco Nigro 2017-11-07 17:44:57 +01:00 committed by Clebert Suconic
parent 590fbcf1d2
commit a0f3da5d72
3 changed files with 121 additions and 9 deletions

View File

@ -110,7 +110,9 @@ public class FailoverTest extends FailoverTestBase {
public void testTimeoutOnFailover() throws Exception { public void testTimeoutOnFailover() throws Exception {
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100); locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
((InVMNodeManager) nodeManager).failoverPause = 500; if (nodeManager instanceof InVMNodeManager) {
((InVMNodeManager) nodeManager).failoverPause = 500L;
}
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator); ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
@ -175,7 +177,9 @@ public class FailoverTest extends FailoverTestBase {
public void testTimeoutOnFailoverConsume() throws Exception { public void testTimeoutOnFailoverConsume() throws Exception {
locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(300).setRetryInterval(100).setAckBatchSize(0); locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(300).setRetryInterval(100).setAckBatchSize(0);
if (nodeManager instanceof InVMNodeManager) {
((InVMNodeManager) nodeManager).failoverPause = 5000L; ((InVMNodeManager) nodeManager).failoverPause = 5000L;
}
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator); ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
@ -236,7 +240,9 @@ public class FailoverTest extends FailoverTestBase {
public void testTimeoutOnFailoverConsumeBlocked() throws Exception { public void testTimeoutOnFailoverConsumeBlocked() throws Exception {
locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setConsumerWindowSize(0).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(-1).setAckBatchSize(0); locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setConsumerWindowSize(0).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(-1).setAckBatchSize(0);
if (nodeManager instanceof InVMNodeManager) {
((InVMNodeManager) nodeManager).failoverPause = 5000L; ((InVMNodeManager) nodeManager).failoverPause = 5000L;
}
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator); ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
@ -329,7 +335,9 @@ public class FailoverTest extends FailoverTestBase {
public void testTimeoutOnFailoverTransactionCommit() throws Exception { public void testTimeoutOnFailoverTransactionCommit() throws Exception {
locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100); locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
if (nodeManager instanceof InVMNodeManager) {
((InVMNodeManager) nodeManager).failoverPause = 5000L; ((InVMNodeManager) nodeManager).failoverPause = 5000L;
}
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator); ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
@ -396,7 +404,9 @@ public class FailoverTest extends FailoverTestBase {
public void testTimeoutOnFailoverTransactionCommitTimeoutCommunication() throws Exception { public void testTimeoutOnFailoverTransactionCommitTimeoutCommunication() throws Exception {
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(500); locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(500);
if (nodeManager instanceof InVMNodeManager) {
((InVMNodeManager) nodeManager).failoverPause = 6000L; ((InVMNodeManager) nodeManager).failoverPause = 6000L;
}
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator); ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
final ClientSession session = createSession(sf1, false, false, false); final ClientSession session = createSession(sf1, false, false, false);
@ -473,7 +483,9 @@ public class FailoverTest extends FailoverTestBase {
public void testTimeoutOnFailoverTransactionRollback() throws Exception { public void testTimeoutOnFailoverTransactionRollback() throws Exception {
locator.setCallTimeout(2000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100); locator.setCallTimeout(2000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
if (nodeManager instanceof InVMNodeManager) {
((InVMNodeManager) nodeManager).failoverPause = 5000L; ((InVMNodeManager) nodeManager).failoverPause = 5000L;
}
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator); ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);

View File

@ -153,8 +153,15 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
} }
} }
/**
* Override this if is needed a different implementation of {@link NodeManager} to be used into {@link #createConfigs()}.
*/
protected NodeManager createNodeManager() {
return new InVMNodeManager(false);
}
protected void createConfigs() throws Exception { protected void createConfigs() throws Exception {
nodeManager = new InVMNodeManager(false); nodeManager = createNodeManager();
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
@ -167,6 +174,14 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
liveServer = createTestableServer(liveConfig); liveServer = createTestableServer(liveConfig);
} }
/**
* Override this if is needed a different implementation of {@link NodeManager} to be used into {@link #createReplicatedConfigs()}.
*/
protected NodeManager createReplicatedBackupNodeManager(Configuration backupConfig) {
return new InVMNodeManager(true, backupConfig.getJournalLocation());
}
protected void createReplicatedConfigs() throws Exception { protected void createReplicatedConfigs() throws Exception {
final TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); final TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
final TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); final TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
@ -180,7 +195,7 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
backupConfig.setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)).setSecurityEnabled(false); backupConfig.setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)).setSecurityEnabled(false);
setupHAPolicyConfiguration(); setupHAPolicyConfiguration();
nodeManager = new InVMNodeManager(true, backupConfig.getJournalLocation()); nodeManager = createReplicatedBackupNodeManager(backupConfig);
backupServer = createTestableServer(backupConfig); backupServer = createTestableServer(backupConfig);

View File

@ -16,9 +16,16 @@
*/ */
package org.apache.activemq.artemis.tests.integration.cluster.failover; package org.apache.activemq.artemis.tests.integration.cluster.failover;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@ -26,12 +33,45 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class NettyFailoverTest extends FailoverTest { public class NettyFailoverTest extends FailoverTest {
private static final long JDBC_LOCK_EXPIRATION_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis();
private static final long JDBC_LOCK_RENEW_PERIOD_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis();
private static final long JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis();
private static final SQLProvider SQL_PROVIDER = new DerbySQLProvider.Factory().create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true";
private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
public enum NodeManagerType {
InVM, Jdbc
}
@Parameterized.Parameters(name = "{0} Node Manager")
public static Iterable<? extends Object> nodeManagerTypes() {
return Arrays.asList(new Object[][]{{NodeManagerType.Jdbc}, {NodeManagerType.InVM}});
}
@Parameterized.Parameter
public NodeManagerType nodeManagerType;
@Override @Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
return getNettyAcceptorTransportConfiguration(live); return getNettyAcceptorTransportConfiguration(live);
@ -42,7 +82,52 @@ public class NettyFailoverTest extends FailoverTest {
return getNettyConnectorTransportConfiguration(live); return getNettyConnectorTransportConfiguration(live);
} }
@Test private ScheduledExecutorService scheduledExecutorService;
private ExecutorService executor;
@Override
protected NodeManager createReplicatedBackupNodeManager(Configuration backupConfig) {
Assume.assumeThat("Replicated backup is supported only by " + NodeManagerType.InVM + " Node Manager", nodeManagerType, Is.is(NodeManagerType.InVM));
return super.createReplicatedBackupNodeManager(backupConfig);
}
@Override
protected NodeManager createNodeManager() {
switch (nodeManagerType) {
case InVM:
return new InVMNodeManager(false);
case Jdbc:
//It can uses an in memory JavaDB: the failover tests are in process
final ThreadFactory daemonThreadFactory = t -> {
final Thread th = new Thread(t);
th.setDaemon(true);
return th;
};
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory);
executor = Executors.newFixedThreadPool(2, daemonThreadFactory);
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
return JdbcNodeManager.usingConnectionUrl(UUID.randomUUID().toString(), JDBC_LOCK_EXPIRATION_MILLIS, JDBC_LOCK_RENEW_PERIOD_MILLIS, JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS, JDBC_URL, DRIVER_CLASS_NAME, SQL_PROVIDER, scheduledExecutorService, executorFactory, (code, message, file) -> {
code.printStackTrace();
Assert.fail(message);
});
default:
throw new AssertionError("enum type not supported!");
}
}
@After
public void shutDownExecutors() {
if (scheduledExecutorService != null) {
executor.shutdown();
scheduledExecutorService.shutdown();
this.executor = null;
this.scheduledExecutorService = null;
}
}
@Test(timeout = 120000)
public void testFailoverWithHostAlias() throws Exception { public void testFailoverWithHostAlias() throws Exception {
Map<String, Object> params = new HashMap<>(); Map<String, Object> params = new HashMap<>();
params.put(TransportConstants.HOST_PROP_NAME, "127.0.0.1"); params.put(TransportConstants.HOST_PROP_NAME, "127.0.0.1");