This commit is contained in:
Clebert Suconic 2021-02-25 10:19:02 -05:00
commit 9d4f88c1e5
3 changed files with 21 additions and 13 deletions

View File

@ -1546,20 +1546,22 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
if (receivedTopology) { if (receivedTopology) {
return; return;
} }
TransportConfiguration[] newInitialconnectors = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, newConnectors.size());
int count = 0; final List<TransportConfiguration> newInitialconnectors = new ArrayList<>(newConnectors.size());
for (DiscoveryEntry entry : newConnectors) { for (DiscoveryEntry entry : newConnectors) {
newInitialconnectors[count++] = entry.getConnector();
if (ha && topology.getMember(entry.getNodeID()) == null) { if (ha && topology.getMember(entry.getNodeID()) == null) {
TopologyMemberImpl member = new TopologyMemberImpl(entry.getNodeID(), null, null, entry.getConnector(), null); TopologyMemberImpl member = new TopologyMemberImpl(entry.getNodeID(), null, null, entry.getConnector(), null);
// on this case we set it as zero as any update coming from server should be accepted // on this case we set it as zero as any update coming from server should be accepted
topology.updateMember(0, entry.getNodeID(), member); topology.updateMember(0, entry.getNodeID(), member);
} }
// ignore its own transport connector
if (!entry.getConnector().equals(clusterTransportConfiguration)) {
newInitialconnectors.add(entry.getConnector());
}
} }
this.initialConnectors = newInitialconnectors.length == 0 ? null : newInitialconnectors; this.initialConnectors = newInitialconnectors.toArray(new TransportConfiguration[newInitialconnectors.size()]);
if (clusterConnection && !receivedTopology && this.getNumInitialConnectors() > 0) { if (clusterConnection && !receivedTopology && this.getNumInitialConnectors() > 0) {
// The node is alone in the cluster. We create a connection to the new node // The node is alone in the cluster. We create a connection to the new node

View File

@ -158,15 +158,17 @@ public class ClusterController implements ActiveMQComponent {
/** /**
* add a locator for a cluster connection. * add a locator for a cluster connection.
* *
* @param name the cluster connection name * @param name the cluster connection name
* @param dg the discovery group to use * @param dg the discovery group to use
* @param config the cluster connection config * @param config the cluster connection config
* @param connector the cluster connector configuration
*/ */
public void addClusterConnection(SimpleString name, public void addClusterConnection(SimpleString name,
DiscoveryGroupConfiguration dg, DiscoveryGroupConfiguration dg,
ClusterConnectionConfiguration config) { ClusterConnectionConfiguration config,
TransportConfiguration connector) {
ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(dg); ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(dg);
configAndAdd(name, serverLocator, config); configAndAdd(name, serverLocator, config, connector);
} }
/** /**
@ -179,12 +181,13 @@ public class ClusterController implements ActiveMQComponent {
TransportConfiguration[] tcConfigs, TransportConfiguration[] tcConfigs,
ClusterConnectionConfiguration config) { ClusterConnectionConfiguration config) {
ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(tcConfigs); ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(tcConfigs);
configAndAdd(name, serverLocator, config); configAndAdd(name, serverLocator, config, null);
} }
private void configAndAdd(SimpleString name, private void configAndAdd(SimpleString name,
ServerLocatorInternal serverLocator, ServerLocatorInternal serverLocator,
ClusterConnectionConfiguration config) { ClusterConnectionConfiguration config,
TransportConfiguration connector) {
serverLocator.setConnectionTTL(config.getConnectionTTL()); serverLocator.setConnectionTTL(config.getConnectionTTL());
serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod()); serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
//if the cluster isn't available we want to hang around until it is //if the cluster isn't available we want to hang around until it is
@ -198,6 +201,9 @@ public class ClusterController implements ActiveMQComponent {
//this is used for replication so need to use the server packet decoder //this is used for replication so need to use the server packet decoder
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator, server.getStorageManager())); serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator, server.getStorageManager()));
serverLocator.setThreadPools(server.getThreadPool(), server.getScheduledPool()); serverLocator.setThreadPools(server.getThreadPool(), server.getScheduledPool());
if (connector != null) {
serverLocator.setClusterTransportConfiguration(connector);
}
try { try {
serverLocator.initialize(); serverLocator.initialize();
} catch (Exception e) { } catch (Exception e) {

View File

@ -609,7 +609,7 @@ public final class ClusterManager implements ActiveMQComponent {
clusterConnection = new ClusterConnectionImpl(this, dg, connector, new SimpleString(config.getName()), new SimpleString(config.getAddress() != null ? config.getAddress() : ""), config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), config.getConnectionTTL(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getCallTimeout(), config.getCallFailoverTimeout(), config.isDuplicateDetection(), config.getMessageLoadBalancingType(), config.getConfirmationWindowSize(), config.getProducerWindowSize(), executorFactory, server, postOffice, managementService, scheduledExecutor, config.getMaxHops(), nodeManager, server.getConfiguration().getClusterUser(), server.getConfiguration().getClusterPassword(), config.isAllowDirectConnectionsOnly(), config.getClusterNotificationInterval(), config.getClusterNotificationAttempts()); clusterConnection = new ClusterConnectionImpl(this, dg, connector, new SimpleString(config.getName()), new SimpleString(config.getAddress() != null ? config.getAddress() : ""), config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), config.getConnectionTTL(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getCallTimeout(), config.getCallFailoverTimeout(), config.isDuplicateDetection(), config.getMessageLoadBalancingType(), config.getConfirmationWindowSize(), config.getProducerWindowSize(), executorFactory, server, postOffice, managementService, scheduledExecutor, config.getMaxHops(), nodeManager, server.getConfiguration().getClusterUser(), server.getConfiguration().getClusterPassword(), config.isAllowDirectConnectionsOnly(), config.getClusterNotificationInterval(), config.getClusterNotificationAttempts());
clusterController.addClusterConnection(clusterConnection.getName(), dg, config); clusterController.addClusterConnection(clusterConnection.getName(), dg, config, connector);
} else { } else {
TransportConfiguration[] tcConfigs = config.getTransportConfigurations(configuration); TransportConfiguration[] tcConfigs = config.getTransportConfigurations(configuration);