Bug 1174886 - HornetQ TTL / check-period not being respected

on the replication channel

The connection-ttl and client-failure-check-period are not passed
to the server locator used to create replication connection. So the
fix sets the two parameters in SharedNothingBackupActivation.
This commit is contained in:
Howard Gao 2014-12-23 20:38:35 +08:00
parent 2e14352055
commit 1d159e6da0
5 changed files with 106 additions and 18 deletions

View File

@ -36,6 +36,7 @@ import org.apache.activemq.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.core.client.impl.Topology;
import org.apache.activemq.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.core.protocol.core.Channel;
import org.apache.activemq.core.protocol.core.ChannelHandler;
import org.apache.activemq.core.protocol.core.CoreRemotingConnection;
@ -162,16 +163,12 @@ public class ClusterController implements ActiveMQComponent
*
* @param name the cluster connection name
* @param dg the discovery group to use
* @param config the cluster connection config
*/
public void addClusterConnection(SimpleString name, DiscoveryGroupConfiguration dg)
public void addClusterConnection(SimpleString name, DiscoveryGroupConfiguration dg, ClusterConnectionConfiguration config)
{
ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(dg);
//if the cluster isn't available we want to hang around until it is
serverLocator.setReconnectAttempts(-1);
serverLocator.setInitialConnectAttempts(-1);
//this is used for replication so need to use the server packet decoder
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
locators.put(name, serverLocator);
configAndAdd(name, serverLocator, config);
}
/**
@ -180,9 +177,16 @@ public class ClusterController implements ActiveMQComponent
* @param name the cluster connection name
* @param tcConfigs the transport configurations to use
*/
public void addClusterConnection(SimpleString name, TransportConfiguration[] tcConfigs)
public void addClusterConnection(SimpleString name, TransportConfiguration[] tcConfigs, ClusterConnectionConfiguration config)
{
ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(tcConfigs);
configAndAdd(name, serverLocator, config);
}
private void configAndAdd(SimpleString name, ServerLocatorInternal serverLocator, ClusterConnectionConfiguration config)
{
serverLocator.setConnectionTTL(config.getConnectionTTL());
serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
//if the cluster isn't available we want to hang around until it is
serverLocator.setReconnectAttempts(-1);
serverLocator.setInitialConnectAttempts(-1);
@ -455,4 +459,10 @@ public class ClusterController implements ActiveMQComponent
}
}
}
public ServerLocator getReplicationLocator()
{
return this.replicationLocator;
}
}

View File

@ -751,7 +751,7 @@ public final class ClusterManager implements ActiveMQComponent
config.getClusterNotificationInterval(),
config.getClusterNotificationAttempts());
clusterController.addClusterConnection(clusterConnection.getName(), dg);
clusterController.addClusterConnection(clusterConnection.getName(), dg, config);
}
else
{
@ -794,7 +794,7 @@ public final class ClusterManager implements ActiveMQComponent
config.getClusterNotificationAttempts());
clusterController.addClusterConnection(clusterConnection.getName(), tcConfigs);
clusterController.addClusterConnection(clusterConnection.getName(), tcConfigs, config);
}

View File

@ -210,7 +210,7 @@ public abstract class FailoverTestBase extends ServiceTestBase
backupConfig = createDefaultConfig();
liveConfig = createDefaultConfig();
ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector);
ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, null);
final String suffix = "_backup";
backupConfig.setBindingsDirectory(backupConfig.getBindingsDirectory() + suffix)

View File

@ -45,6 +45,7 @@ import org.apache.activemq.api.core.client.ClientProducer;
import org.apache.activemq.api.core.client.ClientSession;
import org.apache.activemq.api.core.client.ClientSessionFactory;
import org.apache.activemq.api.core.client.ServerLocator;
import org.apache.activemq.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.core.config.Configuration;
import org.apache.activemq.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.core.journal.EncodingSupport;
@ -75,6 +76,8 @@ import org.apache.activemq.core.replication.ReplicationManager;
import org.apache.activemq.core.server.ActiveMQComponent;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.ServerMessage;
import org.apache.activemq.core.server.cluster.ClusterController;
import org.apache.activemq.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.core.server.impl.ServerMessageImpl;
import org.apache.activemq.core.settings.HierarchicalRepository;
import org.apache.activemq.core.settings.impl.AddressSettings;
@ -117,10 +120,30 @@ public final class ReplicationTest extends ServiceTestBase
private void setupServer(boolean backup, String... interceptors) throws Exception
{
this.setupServer(false, backup, null, interceptors);
}
final TransportConfiguration liveConnector = TransportConfigurationUtils.getInVMConnector(true);
final TransportConfiguration backupConnector = TransportConfigurationUtils.getInVMConnector(false);
final TransportConfiguration backupAcceptor = TransportConfigurationUtils.getInVMAcceptor(false);
private void setupServer(boolean useNetty, boolean backup,
ExtraConfigurer extraConfig,
String... incomingInterceptors) throws Exception
{
TransportConfiguration liveConnector = null;
TransportConfiguration liveAcceptor = null;
TransportConfiguration backupConnector = null;
TransportConfiguration backupAcceptor = null;
if (useNetty)
{
liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0);
liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0);
backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0);
backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0);
}
else
{
liveConnector = TransportConfigurationUtils.getInVMConnector(true);
backupConnector = TransportConfigurationUtils.getInVMConnector(false);
backupAcceptor = TransportConfigurationUtils.getInVMAcceptor(false);
}
final String suffix = "_backup";
Configuration liveConfig = createDefaultConfig();
@ -131,9 +154,14 @@ public final class ReplicationTest extends ServiceTestBase
.setJournalDirectory(ActiveMQDefaultConfiguration.getDefaultJournalDir() + suffix)
.setPagingDirectory(ActiveMQDefaultConfiguration.getDefaultPagingDir() + suffix)
.setLargeMessagesDirectory(ActiveMQDefaultConfiguration.getDefaultLargeMessagesDir() + suffix)
.setIncomingInterceptorClassNames(interceptors.length > 0 ? Arrays.asList(interceptors) : new ArrayList<String>());
.setIncomingInterceptorClassNames(incomingInterceptors.length > 0 ? Arrays.asList(incomingInterceptors) : new ArrayList<String>());
ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector);
ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor);
if (extraConfig != null)
{
extraConfig.config(liveConfig, backupConfig);
}
if (backup)
{
@ -143,7 +171,15 @@ public final class ReplicationTest extends ServiceTestBase
}
backupServer = createServer(backupConfig);
locator = createInVMNonHALocator();
if (useNetty)
{
locator = createNettyNonHALocator();
}
else
{
locator = createInVMNonHALocator();
}
backupServer.start();
if (backup)
{
@ -413,6 +449,37 @@ public final class ReplicationTest extends ServiceTestBase
}
@Test
public void testClusterConnectionConfigs() throws Exception
{
final long ttlOverride = 123456789;
final long checkPeriodOverride = 987654321;
ExtraConfigurer configurer = new ExtraConfigurer() {
@Override
public void config(Configuration liveConfig, Configuration backupConfig)
{
List<ClusterConnectionConfiguration> ccList = backupConfig.getClusterConfigurations();
assertTrue(ccList.size() > 0);
ClusterConnectionConfiguration cc = ccList.get(0);
cc.setConnectionTTL(ttlOverride);
cc.setClientFailureCheckPeriod(checkPeriodOverride);
}
};
this.setupServer(true, true, configurer);
assertTrue(backupServer instanceof ActiveMQServerImpl);
ClusterController controller = backupServer.getClusterManager().getClusterController();
ServerLocator replicationLocator = controller.getReplicationLocator();
assertNotNull(replicationLocator);
assertEquals(ttlOverride, replicationLocator.getConnectionTTL());
assertEquals(checkPeriodOverride, replicationLocator.getClientFailureCheckPeriod());
}
/**
* @return
* @throws Exception
@ -901,4 +968,9 @@ public final class ReplicationTest extends ServiceTestBase
// no-op
}
}
private interface ExtraConfigurer
{
void config(Configuration liveConfig, Configuration backupConfig);
}
}

View File

@ -34,13 +34,19 @@ public final class ReplicatedBackupUtils
TransportConfiguration backupConnector,
TransportConfiguration backupAcceptor,
Configuration liveConfig,
TransportConfiguration liveConnector)
TransportConfiguration liveConnector,
TransportConfiguration liveAcceptor)
{
if (backupAcceptor != null)
{
backupConfig.clearAcceptorConfigurations().addAcceptorConfiguration(backupAcceptor);
}
if (liveAcceptor != null)
{
liveConfig.clearAcceptorConfigurations().addAcceptorConfiguration(liveAcceptor);
}
backupConfig.addConnectorConfiguration(BACKUP_NODE_NAME, backupConnector)
.addConnectorConfiguration(LIVE_NODE_NAME, liveConnector)
.addClusterConfiguration(UnitTestCase.basicClusterConnectionConfig(BACKUP_NODE_NAME, LIVE_NODE_NAME))