diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterController.java b/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterController.java index 47c9ae8050..abbf9cb15c 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterController.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterController.java @@ -36,6 +36,7 @@ import org.apache.activemq.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.core.client.impl.ServerLocatorImpl; import org.apache.activemq.core.client.impl.ServerLocatorInternal; import org.apache.activemq.core.client.impl.Topology; +import org.apache.activemq.core.config.ClusterConnectionConfiguration; import org.apache.activemq.core.protocol.core.Channel; import org.apache.activemq.core.protocol.core.ChannelHandler; import org.apache.activemq.core.protocol.core.CoreRemotingConnection; @@ -162,16 +163,12 @@ public class ClusterController implements ActiveMQComponent * * @param name the cluster connection name * @param dg the discovery group to use + * @param config the cluster connection config */ - public void addClusterConnection(SimpleString name, DiscoveryGroupConfiguration dg) + public void addClusterConnection(SimpleString name, DiscoveryGroupConfiguration dg, ClusterConnectionConfiguration config) { ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(dg); - //if the cluster isn't available we want to hang around until it is - serverLocator.setReconnectAttempts(-1); - serverLocator.setInitialConnectAttempts(-1); - //this is used for replication so need to use the server packet decoder - serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance()); - locators.put(name, serverLocator); + configAndAdd(name, serverLocator, config); } /** @@ -180,9 +177,16 @@ public class ClusterController implements ActiveMQComponent * @param name the cluster connection name * @param tcConfigs the transport configurations to use */ - public void addClusterConnection(SimpleString name, TransportConfiguration[] tcConfigs) + public void addClusterConnection(SimpleString name, TransportConfiguration[] tcConfigs, ClusterConnectionConfiguration config) { ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(tcConfigs); + configAndAdd(name, serverLocator, config); + } + + private void configAndAdd(SimpleString name, ServerLocatorInternal serverLocator, ClusterConnectionConfiguration config) + { + serverLocator.setConnectionTTL(config.getConnectionTTL()); + serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod()); //if the cluster isn't available we want to hang around until it is serverLocator.setReconnectAttempts(-1); serverLocator.setInitialConnectAttempts(-1); @@ -455,4 +459,10 @@ public class ClusterController implements ActiveMQComponent } } } + + public ServerLocator getReplicationLocator() + { + return this.replicationLocator; + } + } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java b/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java index 9cfacfdb2f..7d09fdd9e3 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java @@ -751,7 +751,7 @@ public final class ClusterManager implements ActiveMQComponent config.getClusterNotificationInterval(), config.getClusterNotificationAttempts()); - clusterController.addClusterConnection(clusterConnection.getName(), dg); + clusterController.addClusterConnection(clusterConnection.getName(), dg, config); } else { @@ -794,7 +794,7 @@ public final class ClusterManager implements ActiveMQComponent config.getClusterNotificationAttempts()); - clusterController.addClusterConnection(clusterConnection.getName(), tcConfigs); + clusterController.addClusterConnection(clusterConnection.getName(), tcConfigs, config); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/failover/FailoverTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/failover/FailoverTestBase.java index 77920e9dc9..a83661c046 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/failover/FailoverTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/failover/FailoverTestBase.java @@ -210,7 +210,7 @@ public abstract class FailoverTestBase extends ServiceTestBase backupConfig = createDefaultConfig(); liveConfig = createDefaultConfig(); - ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector); + ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, null); final String suffix = "_backup"; backupConfig.setBindingsDirectory(backupConfig.getBindingsDirectory() + suffix) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/replication/ReplicationTest.java index df77659f4a..5b2829c0e2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/replication/ReplicationTest.java @@ -45,6 +45,7 @@ import org.apache.activemq.api.core.client.ClientProducer; import org.apache.activemq.api.core.client.ClientSession; import org.apache.activemq.api.core.client.ClientSessionFactory; import org.apache.activemq.api.core.client.ServerLocator; +import org.apache.activemq.core.config.ClusterConnectionConfiguration; import org.apache.activemq.core.config.Configuration; import org.apache.activemq.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.apache.activemq.core.journal.EncodingSupport; @@ -75,6 +76,8 @@ import org.apache.activemq.core.replication.ReplicationManager; import org.apache.activemq.core.server.ActiveMQComponent; import org.apache.activemq.core.server.ActiveMQServer; import org.apache.activemq.core.server.ServerMessage; +import org.apache.activemq.core.server.cluster.ClusterController; +import org.apache.activemq.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.core.server.impl.ServerMessageImpl; import org.apache.activemq.core.settings.HierarchicalRepository; import org.apache.activemq.core.settings.impl.AddressSettings; @@ -117,10 +120,30 @@ public final class ReplicationTest extends ServiceTestBase private void setupServer(boolean backup, String... interceptors) throws Exception { + this.setupServer(false, backup, null, interceptors); + } - final TransportConfiguration liveConnector = TransportConfigurationUtils.getInVMConnector(true); - final TransportConfiguration backupConnector = TransportConfigurationUtils.getInVMConnector(false); - final TransportConfiguration backupAcceptor = TransportConfigurationUtils.getInVMAcceptor(false); + private void setupServer(boolean useNetty, boolean backup, + ExtraConfigurer extraConfig, + String... incomingInterceptors) throws Exception + { + TransportConfiguration liveConnector = null; + TransportConfiguration liveAcceptor = null; + TransportConfiguration backupConnector = null; + TransportConfiguration backupAcceptor = null; + if (useNetty) + { + liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0); + liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0); + backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0); + backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0); + } + else + { + liveConnector = TransportConfigurationUtils.getInVMConnector(true); + backupConnector = TransportConfigurationUtils.getInVMConnector(false); + backupAcceptor = TransportConfigurationUtils.getInVMAcceptor(false); + } final String suffix = "_backup"; Configuration liveConfig = createDefaultConfig(); @@ -131,9 +154,14 @@ public final class ReplicationTest extends ServiceTestBase .setJournalDirectory(ActiveMQDefaultConfiguration.getDefaultJournalDir() + suffix) .setPagingDirectory(ActiveMQDefaultConfiguration.getDefaultPagingDir() + suffix) .setLargeMessagesDirectory(ActiveMQDefaultConfiguration.getDefaultLargeMessagesDir() + suffix) - .setIncomingInterceptorClassNames(interceptors.length > 0 ? Arrays.asList(interceptors) : new ArrayList()); + .setIncomingInterceptorClassNames(incomingInterceptors.length > 0 ? Arrays.asList(incomingInterceptors) : new ArrayList()); - ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector); + ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor); + + if (extraConfig != null) + { + extraConfig.config(liveConfig, backupConfig); + } if (backup) { @@ -143,7 +171,15 @@ public final class ReplicationTest extends ServiceTestBase } backupServer = createServer(backupConfig); - locator = createInVMNonHALocator(); + if (useNetty) + { + locator = createNettyNonHALocator(); + } + else + { + locator = createInVMNonHALocator(); + } + backupServer.start(); if (backup) { @@ -413,6 +449,37 @@ public final class ReplicationTest extends ServiceTestBase } + @Test + public void testClusterConnectionConfigs() throws Exception + { + final long ttlOverride = 123456789; + final long checkPeriodOverride = 987654321; + + ExtraConfigurer configurer = new ExtraConfigurer() { + + @Override + public void config(Configuration liveConfig, Configuration backupConfig) + { + List ccList = backupConfig.getClusterConfigurations(); + assertTrue(ccList.size() > 0); + ClusterConnectionConfiguration cc = ccList.get(0); + cc.setConnectionTTL(ttlOverride); + cc.setClientFailureCheckPeriod(checkPeriodOverride); + } + }; + this.setupServer(true, true, configurer); + assertTrue(backupServer instanceof ActiveMQServerImpl); + + ClusterController controller = backupServer.getClusterManager().getClusterController(); + + ServerLocator replicationLocator = controller.getReplicationLocator(); + + assertNotNull(replicationLocator); + + assertEquals(ttlOverride, replicationLocator.getConnectionTTL()); + assertEquals(checkPeriodOverride, replicationLocator.getClientFailureCheckPeriod()); + } + /** * @return * @throws Exception @@ -901,4 +968,9 @@ public final class ReplicationTest extends ServiceTestBase // no-op } } + + private interface ExtraConfigurer + { + void config(Configuration liveConfig, Configuration backupConfig); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/util/ReplicatedBackupUtils.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/util/ReplicatedBackupUtils.java index 36b2ed7312..4d61320692 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/util/ReplicatedBackupUtils.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/util/ReplicatedBackupUtils.java @@ -34,13 +34,19 @@ public final class ReplicatedBackupUtils TransportConfiguration backupConnector, TransportConfiguration backupAcceptor, Configuration liveConfig, - TransportConfiguration liveConnector) + TransportConfiguration liveConnector, + TransportConfiguration liveAcceptor) { if (backupAcceptor != null) { backupConfig.clearAcceptorConfigurations().addAcceptorConfiguration(backupAcceptor); } + if (liveAcceptor != null) + { + liveConfig.clearAcceptorConfigurations().addAcceptorConfiguration(liveAcceptor); + } + backupConfig.addConnectorConfiguration(BACKUP_NODE_NAME, backupConnector) .addConnectorConfiguration(LIVE_NODE_NAME, liveConnector) .addClusterConfiguration(UnitTestCase.basicClusterConnectionConfig(BACKUP_NODE_NAME, LIVE_NODE_NAME))