Merge #50 ttl fix from Howard

This commit is contained in:
jbertram 2014-12-31 11:05:07 -06:00
commit 16d74b2bec
5 changed files with 106 additions and 18 deletions

View File

@ -36,6 +36,7 @@ import org.apache.activemq.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.core.client.impl.ServerLocatorImpl; import org.apache.activemq.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.core.client.impl.ServerLocatorInternal; import org.apache.activemq.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.core.client.impl.Topology; import org.apache.activemq.core.client.impl.Topology;
import org.apache.activemq.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.core.protocol.core.Channel; import org.apache.activemq.core.protocol.core.Channel;
import org.apache.activemq.core.protocol.core.ChannelHandler; import org.apache.activemq.core.protocol.core.ChannelHandler;
import org.apache.activemq.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.core.protocol.core.CoreRemotingConnection;
@ -162,16 +163,12 @@ public class ClusterController implements ActiveMQComponent
* *
* @param name the cluster connection name * @param name the cluster connection name
* @param dg the discovery group to use * @param dg the discovery group to use
* @param config the cluster connection config
*/ */
public void addClusterConnection(SimpleString name, DiscoveryGroupConfiguration dg) public void addClusterConnection(SimpleString name, DiscoveryGroupConfiguration dg, ClusterConnectionConfiguration config)
{ {
ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(dg); ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(dg);
//if the cluster isn't available we want to hang around until it is configAndAdd(name, serverLocator, config);
serverLocator.setReconnectAttempts(-1);
serverLocator.setInitialConnectAttempts(-1);
//this is used for replication so need to use the server packet decoder
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
locators.put(name, serverLocator);
} }
/** /**
@ -180,9 +177,16 @@ public class ClusterController implements ActiveMQComponent
* @param name the cluster connection name * @param name the cluster connection name
* @param tcConfigs the transport configurations to use * @param tcConfigs the transport configurations to use
*/ */
public void addClusterConnection(SimpleString name, TransportConfiguration[] tcConfigs) public void addClusterConnection(SimpleString name, TransportConfiguration[] tcConfigs, ClusterConnectionConfiguration config)
{ {
ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(tcConfigs); ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(tcConfigs);
configAndAdd(name, serverLocator, config);
}
private void configAndAdd(SimpleString name, ServerLocatorInternal serverLocator, ClusterConnectionConfiguration config)
{
serverLocator.setConnectionTTL(config.getConnectionTTL());
serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
//if the cluster isn't available we want to hang around until it is //if the cluster isn't available we want to hang around until it is
serverLocator.setReconnectAttempts(-1); serverLocator.setReconnectAttempts(-1);
serverLocator.setInitialConnectAttempts(-1); serverLocator.setInitialConnectAttempts(-1);
@ -455,4 +459,10 @@ public class ClusterController implements ActiveMQComponent
} }
} }
} }
public ServerLocator getReplicationLocator()
{
return this.replicationLocator;
}
} }

View File

@ -751,7 +751,7 @@ public final class ClusterManager implements ActiveMQComponent
config.getClusterNotificationInterval(), config.getClusterNotificationInterval(),
config.getClusterNotificationAttempts()); config.getClusterNotificationAttempts());
clusterController.addClusterConnection(clusterConnection.getName(), dg); clusterController.addClusterConnection(clusterConnection.getName(), dg, config);
} }
else else
{ {
@ -794,7 +794,7 @@ public final class ClusterManager implements ActiveMQComponent
config.getClusterNotificationAttempts()); config.getClusterNotificationAttempts());
clusterController.addClusterConnection(clusterConnection.getName(), tcConfigs); clusterController.addClusterConnection(clusterConnection.getName(), tcConfigs, config);
} }

View File

@ -210,7 +210,7 @@ public abstract class FailoverTestBase extends ServiceTestBase
backupConfig = createDefaultConfig(); backupConfig = createDefaultConfig();
liveConfig = createDefaultConfig(); liveConfig = createDefaultConfig();
ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector); ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, null);
final String suffix = "_backup"; final String suffix = "_backup";
backupConfig.setBindingsDirectory(backupConfig.getBindingsDirectory() + suffix) backupConfig.setBindingsDirectory(backupConfig.getBindingsDirectory() + suffix)

View File

@ -45,6 +45,7 @@ import org.apache.activemq.api.core.client.ClientProducer;
import org.apache.activemq.api.core.client.ClientSession; import org.apache.activemq.api.core.client.ClientSession;
import org.apache.activemq.api.core.client.ClientSessionFactory; import org.apache.activemq.api.core.client.ClientSessionFactory;
import org.apache.activemq.api.core.client.ServerLocator; import org.apache.activemq.api.core.client.ServerLocator;
import org.apache.activemq.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.core.config.Configuration; import org.apache.activemq.core.config.Configuration;
import org.apache.activemq.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.apache.activemq.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.core.journal.EncodingSupport; import org.apache.activemq.core.journal.EncodingSupport;
@ -75,6 +76,8 @@ import org.apache.activemq.core.replication.ReplicationManager;
import org.apache.activemq.core.server.ActiveMQComponent; import org.apache.activemq.core.server.ActiveMQComponent;
import org.apache.activemq.core.server.ActiveMQServer; import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.ServerMessage; import org.apache.activemq.core.server.ServerMessage;
import org.apache.activemq.core.server.cluster.ClusterController;
import org.apache.activemq.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.core.server.impl.ServerMessageImpl; import org.apache.activemq.core.server.impl.ServerMessageImpl;
import org.apache.activemq.core.settings.HierarchicalRepository; import org.apache.activemq.core.settings.HierarchicalRepository;
import org.apache.activemq.core.settings.impl.AddressSettings; import org.apache.activemq.core.settings.impl.AddressSettings;
@ -117,10 +120,30 @@ public final class ReplicationTest extends ServiceTestBase
private void setupServer(boolean backup, String... interceptors) throws Exception private void setupServer(boolean backup, String... interceptors) throws Exception
{ {
this.setupServer(false, backup, null, interceptors);
}
final TransportConfiguration liveConnector = TransportConfigurationUtils.getInVMConnector(true); private void setupServer(boolean useNetty, boolean backup,
final TransportConfiguration backupConnector = TransportConfigurationUtils.getInVMConnector(false); ExtraConfigurer extraConfig,
final TransportConfiguration backupAcceptor = TransportConfigurationUtils.getInVMAcceptor(false); String... incomingInterceptors) throws Exception
{
TransportConfiguration liveConnector = null;
TransportConfiguration liveAcceptor = null;
TransportConfiguration backupConnector = null;
TransportConfiguration backupAcceptor = null;
if (useNetty)
{
liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0);
liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0);
backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0);
backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0);
}
else
{
liveConnector = TransportConfigurationUtils.getInVMConnector(true);
backupConnector = TransportConfigurationUtils.getInVMConnector(false);
backupAcceptor = TransportConfigurationUtils.getInVMAcceptor(false);
}
final String suffix = "_backup"; final String suffix = "_backup";
Configuration liveConfig = createDefaultConfig(); Configuration liveConfig = createDefaultConfig();
@ -131,9 +154,14 @@ public final class ReplicationTest extends ServiceTestBase
.setJournalDirectory(ActiveMQDefaultConfiguration.getDefaultJournalDir() + suffix) .setJournalDirectory(ActiveMQDefaultConfiguration.getDefaultJournalDir() + suffix)
.setPagingDirectory(ActiveMQDefaultConfiguration.getDefaultPagingDir() + suffix) .setPagingDirectory(ActiveMQDefaultConfiguration.getDefaultPagingDir() + suffix)
.setLargeMessagesDirectory(ActiveMQDefaultConfiguration.getDefaultLargeMessagesDir() + suffix) .setLargeMessagesDirectory(ActiveMQDefaultConfiguration.getDefaultLargeMessagesDir() + suffix)
.setIncomingInterceptorClassNames(interceptors.length > 0 ? Arrays.asList(interceptors) : new ArrayList<String>()); .setIncomingInterceptorClassNames(incomingInterceptors.length > 0 ? Arrays.asList(incomingInterceptors) : new ArrayList<String>());
ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector); ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor);
if (extraConfig != null)
{
extraConfig.config(liveConfig, backupConfig);
}
if (backup) if (backup)
{ {
@ -143,7 +171,15 @@ public final class ReplicationTest extends ServiceTestBase
} }
backupServer = createServer(backupConfig); backupServer = createServer(backupConfig);
locator = createInVMNonHALocator(); if (useNetty)
{
locator = createNettyNonHALocator();
}
else
{
locator = createInVMNonHALocator();
}
backupServer.start(); backupServer.start();
if (backup) if (backup)
{ {
@ -413,6 +449,37 @@ public final class ReplicationTest extends ServiceTestBase
} }
@Test
public void testClusterConnectionConfigs() throws Exception
{
final long ttlOverride = 123456789;
final long checkPeriodOverride = 987654321;
ExtraConfigurer configurer = new ExtraConfigurer() {
@Override
public void config(Configuration liveConfig, Configuration backupConfig)
{
List<ClusterConnectionConfiguration> ccList = backupConfig.getClusterConfigurations();
assertTrue(ccList.size() > 0);
ClusterConnectionConfiguration cc = ccList.get(0);
cc.setConnectionTTL(ttlOverride);
cc.setClientFailureCheckPeriod(checkPeriodOverride);
}
};
this.setupServer(true, true, configurer);
assertTrue(backupServer instanceof ActiveMQServerImpl);
ClusterController controller = backupServer.getClusterManager().getClusterController();
ServerLocator replicationLocator = controller.getReplicationLocator();
assertNotNull(replicationLocator);
assertEquals(ttlOverride, replicationLocator.getConnectionTTL());
assertEquals(checkPeriodOverride, replicationLocator.getClientFailureCheckPeriod());
}
/** /**
* @return * @return
* @throws Exception * @throws Exception
@ -901,4 +968,9 @@ public final class ReplicationTest extends ServiceTestBase
// no-op // no-op
} }
} }
private interface ExtraConfigurer
{
void config(Configuration liveConfig, Configuration backupConfig);
}
} }

View File

@ -34,13 +34,19 @@ public final class ReplicatedBackupUtils
TransportConfiguration backupConnector, TransportConfiguration backupConnector,
TransportConfiguration backupAcceptor, TransportConfiguration backupAcceptor,
Configuration liveConfig, Configuration liveConfig,
TransportConfiguration liveConnector) TransportConfiguration liveConnector,
TransportConfiguration liveAcceptor)
{ {
if (backupAcceptor != null) if (backupAcceptor != null)
{ {
backupConfig.clearAcceptorConfigurations().addAcceptorConfiguration(backupAcceptor); backupConfig.clearAcceptorConfigurations().addAcceptorConfiguration(backupAcceptor);
} }
if (liveAcceptor != null)
{
liveConfig.clearAcceptorConfigurations().addAcceptorConfiguration(liveAcceptor);
}
backupConfig.addConnectorConfiguration(BACKUP_NODE_NAME, backupConnector) backupConfig.addConnectorConfiguration(BACKUP_NODE_NAME, backupConnector)
.addConnectorConfiguration(LIVE_NODE_NAME, liveConnector) .addConnectorConfiguration(LIVE_NODE_NAME, liveConnector)
.addClusterConfiguration(UnitTestCase.basicClusterConnectionConfig(BACKUP_NODE_NAME, LIVE_NODE_NAME)) .addClusterConfiguration(UnitTestCase.basicClusterConnectionConfig(BACKUP_NODE_NAME, LIVE_NODE_NAME))