ARTEMIS-210 outbound RA connection load-balancing

Inbound sessions are always created from the same ActiveMQConnectionFactory
which means the load-balancing policy is applied to them in the expected
manner. However, outbound sessions are created from independent, unique
ActiveMQConnectionFactory instances which means that the load-balancing
doesn't follow the expected pattern.

This commit changes this behavior by caching each unique
ActiveMQConnectionFactory instance and using it for both inbound and outbound
sessions potentially. This ensures the sessions are load-balanced as
expected.
This commit is contained in:
jbertram 2015-08-24 17:14:53 -05:00
parent 25fc24dfa0
commit 7a1199c475
7 changed files with 631 additions and 78 deletions

View File

@ -570,6 +570,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
// if the topologyArray is null, we will use the initialConnectors // if the topologyArray is null, we will use the initialConnectors
if (usedTopology != null) { if (usedTopology != null) {
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
ActiveMQClientLogger.LOGGER.trace("Selecting connector from toplogy.");
}
int pos = loadBalancingPolicy.select(usedTopology.length); int pos = loadBalancingPolicy.select(usedTopology.length);
Pair<TransportConfiguration, TransportConfiguration> pair = usedTopology[pos]; Pair<TransportConfiguration, TransportConfiguration> pair = usedTopology[pos];
@ -577,6 +580,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
} }
else { else {
// Get from initialconnectors // Get from initialconnectors
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
ActiveMQClientLogger.LOGGER.trace("Selecting connector from initial connectors.");
}
int pos = loadBalancingPolicy.select(initialConnectors.length); int pos = loadBalancingPolicy.select(initialConnectors.length);
@ -1753,4 +1759,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
ServerLocatorImpl clone = new ServerLocatorImpl(this); ServerLocatorImpl clone = new ServerLocatorImpl(this);
return clone; return clone;
} }
public boolean isReceivedToplogy() {
return receivedTopology;
}
} }

View File

@ -807,4 +807,7 @@ public final class ActiveMQRAManagedConnection implements ManagedConnection, Exc
this.inManagedTx = inManagedTx; this.inManagedTx = inManagedTx;
} }
public ActiveMQConnectionFactory getConnectionFactory() {
return connectionFactory;
}
} }

View File

@ -121,6 +121,13 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
private String entries; private String entries;
/**
* Keep track of the connection factories that we create so we don't create a bunch of instances of factories
* configured the exact same way. Using the same connection factory instance also makes connection load-balancing
* behave as expected for outbound connections.
*/
private final Map<ConnectionFactoryProperties, ActiveMQConnectionFactory> knownConnectionFactories = new HashMap<ConnectionFactoryProperties, ActiveMQConnectionFactory>();
/** /**
* Constructor * Constructor
*/ */
@ -267,6 +274,8 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
managedConnectionFactories.clear(); managedConnectionFactories.clear();
knownConnectionFactories.clear();
if (defaultActiveMQConnectionFactory != null) { if (defaultActiveMQConnectionFactory != null) {
defaultActiveMQConnectionFactory.close(); defaultActiveMQConnectionFactory.close();
} }
@ -1586,107 +1595,124 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
public ActiveMQConnectionFactory createActiveMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) { public ActiveMQConnectionFactory createActiveMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) {
ActiveMQConnectionFactory cf; ActiveMQConnectionFactory cf;
List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames(); boolean known = false;
String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress(); synchronized (knownConnectionFactories) {
if (!knownConnectionFactories.keySet().contains(overrideProperties)) {
List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames();
Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA(); String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress();
String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile(); Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA();
String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName(); String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile();
String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass(); String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName();
if (ha == null) { String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass();
ha = ActiveMQClient.DEFAULT_IS_HA;
}
if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) { if (ha == null) {
BroadcastEndpointFactory endpointFactory = null; ha = ActiveMQClient.DEFAULT_IS_HA;
if (jgroupsLocatorClassName != null) {
String jchannelRefName = raProperties.getJgroupsChannelRefName();
JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName);
endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel);
}
else if (discoveryAddress != null) {
Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort();
if (discoveryPort == null) {
discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT;
} }
String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress(); if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) {
endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1); BroadcastEndpointFactory endpointFactory = null;
}
else if (jgroupsFileName != null) {
endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName);
}
Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout();
if (refreshTimeout == null) {
refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
}
Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout(); if (jgroupsLocatorClassName != null) {
String jchannelRefName = raProperties.getJgroupsChannelRefName();
JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName);
endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel);
}
else if (discoveryAddress != null) {
Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort();
if (discoveryPort == null) {
discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT;
}
if (initialTimeout == null) { String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress();
initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT; endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1);
} }
else if (jgroupsFileName != null) {
endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName);
}
Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout();
if (refreshTimeout == null) {
refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
}
DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory); Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout();
if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { if (initialTimeout == null) {
ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha); initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
} }
if (ha) { DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory);
cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF);
}
else {
cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
}
}
else if (connectorClassName != null) {
TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()];
List<Map<String, Object>> connectionParams; if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
if (overrideProperties.getParsedConnectorClassNames() != null) { ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha);
connectionParams = overrideProperties.getParsedConnectionParameters(); }
}
else {
connectionParams = raProperties.getParsedConnectionParameters();
}
for (int i = 0; i < connectorClassName.size(); i++) { if (ha) {
TransportConfiguration tc; cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF);
if (connectionParams == null || i >= connectionParams.size()) { }
tc = new TransportConfiguration(connectorClassName.get(i)); else {
ActiveMQRALogger.LOGGER.debug("No connector params provided using default"); cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
}
}
else if (connectorClassName != null) {
TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()];
List<Map<String, Object>> connectionParams;
if (overrideProperties.getParsedConnectorClassNames() != null) {
connectionParams = overrideProperties.getParsedConnectionParameters();
}
else {
connectionParams = raProperties.getParsedConnectionParameters();
}
for (int i = 0; i < connectorClassName.size(); i++) {
TransportConfiguration tc;
if (connectionParams == null || i >= connectionParams.size()) {
tc = new TransportConfiguration(connectorClassName.get(i));
ActiveMQRALogger.LOGGER.debug("No connector params provided using default");
}
else {
tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i));
}
transportConfigurations[i] = tc;
}
if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" +
Arrays.toString(transportConfigurations) + " with ha=" + ha);
}
if (ha) {
cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations);
}
else {
cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations);
}
} }
else { else {
tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i)); throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory");
} }
transportConfigurations[i] = tc; setParams(cf, overrideProperties);
} knownConnectionFactories.put(overrideProperties, cf);
if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" +
Arrays.toString(transportConfigurations) + " with ha=" + ha);
}
if (ha) {
cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations);
} }
else { else {
cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations); cf = knownConnectionFactories.get(overrideProperties);
known = true;
} }
} }
else {
throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory"); if (known && cf.getServerLocator().isClosed()) {
knownConnectionFactories.remove(overrideProperties);
cf = createActiveMQConnectionFactory(overrideProperties);
} }
setParams(cf, overrideProperties);
return cf; return cf;
} }

View File

@ -682,4 +682,317 @@ public class ConnectionFactoryProperties {
public boolean isHasBeenUpdated() { public boolean isHasBeenUpdated() {
return hasBeenUpdated; return hasBeenUpdated;
} }
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ConnectionFactoryProperties other = (ConnectionFactoryProperties) obj;
if (this.autoGroup == null) {
if (other.autoGroup != null)
return false;
}
else if (!this.autoGroup.equals(other.autoGroup))
return false;
if (this.blockOnAcknowledge == null) {
if (other.blockOnAcknowledge != null)
return false;
}
else if (!this.blockOnAcknowledge.equals(other.blockOnAcknowledge))
return false;
if (this.blockOnDurableSend == null) {
if (other.blockOnDurableSend != null)
return false;
}
else if (!this.blockOnDurableSend.equals(other.blockOnDurableSend))
return false;
if (this.blockOnNonDurableSend == null) {
if (other.blockOnNonDurableSend != null)
return false;
}
else if (!this.blockOnNonDurableSend.equals(other.blockOnNonDurableSend))
return false;
if (this.cacheLargeMessagesClient == null) {
if (other.cacheLargeMessagesClient != null)
return false;
}
else if (!this.cacheLargeMessagesClient.equals(other.cacheLargeMessagesClient))
return false;
if (this.compressLargeMessage == null) {
if (other.compressLargeMessage != null)
return false;
}
else if (!this.compressLargeMessage.equals(other.compressLargeMessage))
return false;
if (this.failoverOnInitialConnection == null) {
if (other.failoverOnInitialConnection != null)
return false;
}
else if (!this.failoverOnInitialConnection.equals(other.failoverOnInitialConnection))
return false;
if (this.ha == null) {
if (other.ha != null)
return false;
}
else if (!this.ha.equals(other.ha))
return false;
if (this.preAcknowledge == null) {
if (other.preAcknowledge != null)
return false;
}
else if (!this.preAcknowledge.equals(other.preAcknowledge))
return false;
if (this.callFailoverTimeout == null) {
if (other.callFailoverTimeout != null)
return false;
}
else if (!this.callFailoverTimeout.equals(other.callFailoverTimeout))
return false;
if (this.callTimeout == null) {
if (other.callTimeout != null)
return false;
}
else if (!this.callTimeout.equals(other.callTimeout))
return false;
if (this.clientFailureCheckPeriod == null) {
if (other.clientFailureCheckPeriod != null)
return false;
}
else if (!this.clientFailureCheckPeriod.equals(other.clientFailureCheckPeriod))
return false;
if (this.clientID == null) {
if (other.clientID != null)
return false;
}
else if (!this.clientID.equals(other.clientID))
return false;
if (this.confirmationWindowSize == null) {
if (other.confirmationWindowSize != null)
return false;
}
else if (!this.confirmationWindowSize.equals(other.confirmationWindowSize))
return false;
if (this.connectionLoadBalancingPolicyClassName == null) {
if (other.connectionLoadBalancingPolicyClassName != null)
return false;
}
else if (!this.connectionLoadBalancingPolicyClassName.equals(other.connectionLoadBalancingPolicyClassName))
return false;
if (this.connectionTTL == null) {
if (other.connectionTTL != null)
return false;
}
else if (!this.connectionTTL.equals(other.connectionTTL))
return false;
if (this.consumerMaxRate == null) {
if (other.consumerMaxRate != null)
return false;
}
else if (!this.consumerMaxRate.equals(other.consumerMaxRate))
return false;
if (this.consumerWindowSize == null) {
if (other.consumerWindowSize != null)
return false;
}
else if (!this.consumerWindowSize.equals(other.consumerWindowSize))
return false;
if (this.discoveryAddress == null) {
if (other.discoveryAddress != null)
return false;
}
else if (!this.discoveryAddress.equals(other.discoveryAddress))
return false;
if (this.discoveryInitialWaitTimeout == null) {
if (other.discoveryInitialWaitTimeout != null)
return false;
}
else if (!this.discoveryInitialWaitTimeout.equals(other.discoveryInitialWaitTimeout))
return false;
if (this.discoveryLocalBindAddress == null) {
if (other.discoveryLocalBindAddress != null)
return false;
}
else if (!this.discoveryLocalBindAddress.equals(other.discoveryLocalBindAddress))
return false;
if (this.discoveryPort == null) {
if (other.discoveryPort != null)
return false;
}
else if (!this.discoveryPort.equals(other.discoveryPort))
return false;
if (this.discoveryRefreshTimeout == null) {
if (other.discoveryRefreshTimeout != null)
return false;
}
else if (!this.discoveryRefreshTimeout.equals(other.discoveryRefreshTimeout))
return false;
if (this.dupsOKBatchSize == null) {
if (other.dupsOKBatchSize != null)
return false;
}
else if (!this.dupsOKBatchSize.equals(other.dupsOKBatchSize))
return false;
if (this.groupID == null) {
if (other.groupID != null)
return false;
}
else if (!this.groupID.equals(other.groupID))
return false;
if (this.initialConnectAttempts == null) {
if (other.initialConnectAttempts != null)
return false;
}
else if (!this.initialConnectAttempts.equals(other.initialConnectAttempts))
return false;
if (this.initialMessagePacketSize == null) {
if (other.initialMessagePacketSize != null)
return false;
}
else if (!this.initialMessagePacketSize.equals(other.initialMessagePacketSize))
return false;
if (this.jgroupsChannelName == null) {
if (other.jgroupsChannelName != null)
return false;
}
else if (!this.jgroupsChannelName.equals(other.jgroupsChannelName))
return false;
if (this.jgroupsFile == null) {
if (other.jgroupsFile != null)
return false;
}
else if (!this.jgroupsFile.equals(other.jgroupsFile))
return false;
if (this.maxRetryInterval == null) {
if (other.maxRetryInterval != null)
return false;
}
else if (!this.maxRetryInterval.equals(other.maxRetryInterval))
return false;
if (this.minLargeMessageSize == null) {
if (other.minLargeMessageSize != null)
return false;
}
else if (!this.minLargeMessageSize.equals(other.minLargeMessageSize))
return false;
if (this.producerMaxRate == null) {
if (other.producerMaxRate != null)
return false;
}
else if (!this.producerMaxRate.equals(other.producerMaxRate))
return false;
if (this.producerWindowSize == null) {
if (other.producerWindowSize != null)
return false;
}
else if (!this.producerWindowSize.equals(other.producerWindowSize))
return false;
if (this.reconnectAttempts == null) {
if (other.reconnectAttempts != null)
return false;
}
else if (!this.reconnectAttempts.equals(other.reconnectAttempts))
return false;
if (this.retryInterval == null) {
if (other.retryInterval != null)
return false;
}
else if (!this.retryInterval.equals(other.retryInterval))
return false;
if (this.retryIntervalMultiplier == null) {
if (other.retryIntervalMultiplier != null)
return false;
}
else if (!this.retryIntervalMultiplier.equals(other.retryIntervalMultiplier))
return false;
if (this.scheduledThreadPoolMaxSize == null) {
if (other.scheduledThreadPoolMaxSize != null)
return false;
}
else if (!this.scheduledThreadPoolMaxSize.equals(other.scheduledThreadPoolMaxSize))
return false;
if (this.threadPoolMaxSize == null) {
if (other.threadPoolMaxSize != null)
return false;
}
else if (!this.threadPoolMaxSize.equals(other.threadPoolMaxSize))
return false;
if (this.transactionBatchSize == null) {
if (other.transactionBatchSize != null)
return false;
}
else if (!this.transactionBatchSize.equals(other.transactionBatchSize))
return false;
if (this.useGlobalPools == null) {
if (other.useGlobalPools != null)
return false;
}
else if (!this.useGlobalPools.equals(other.useGlobalPools))
return false;
if (connectorClassName == null) {
if (other.connectorClassName != null)
return false;
}
else if (!connectorClassName.equals(other.connectorClassName))
return false;
if (this.connectionParameters == null) {
if (other.connectionParameters != null)
return false;
}
else if (!connectionParameters.equals(other.connectionParameters))
return false;
return true;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((ha == null) ? 0 : ha.hashCode());
result = prime * result + ((connectionLoadBalancingPolicyClassName == null) ? 0 : connectionLoadBalancingPolicyClassName.hashCode());
result = prime * result + ((jgroupsFile == null) ? 0 : jgroupsFile.hashCode());
result = prime * result + ((jgroupsChannelName == null) ? 0 : jgroupsChannelName.hashCode());
result = prime * result + ((discoveryAddress == null) ? 0 : discoveryAddress.hashCode());
result = prime * result + ((discoveryPort == null) ? 0 : discoveryPort.hashCode());
result = prime * result + ((discoveryLocalBindAddress == null) ? 0 : discoveryLocalBindAddress.hashCode());
result = prime * result + ((discoveryRefreshTimeout == null) ? 0 : discoveryRefreshTimeout.hashCode());
result = prime * result + ((discoveryInitialWaitTimeout == null) ? 0 : discoveryInitialWaitTimeout.hashCode());
result = prime * result + ((clientID == null) ? 0 : clientID.hashCode());
result = prime * result + ((dupsOKBatchSize == null) ? 0 : dupsOKBatchSize.hashCode());
result = prime * result + ((transactionBatchSize == null) ? 0 : transactionBatchSize.hashCode());
result = prime * result + ((clientFailureCheckPeriod == null) ? 0 : clientFailureCheckPeriod.hashCode());
result = prime * result + ((connectionTTL == null) ? 0 : connectionTTL.hashCode());
result = prime * result + ((cacheLargeMessagesClient == null) ? 0 : cacheLargeMessagesClient.hashCode());
result = prime * result + ((callTimeout == null) ? 0 : callTimeout.hashCode());
result = prime * result + ((callFailoverTimeout == null) ? 0 : callFailoverTimeout.hashCode());
result = prime * result + ((compressLargeMessage == null) ? 0 : compressLargeMessage.hashCode());
result = prime * result + ((consumerWindowSize == null) ? 0 : consumerWindowSize.hashCode());
result = prime * result + ((producerWindowSize == null) ? 0 : producerWindowSize.hashCode());
result = prime * result + ((consumerMaxRate == null) ? 0 : consumerMaxRate.hashCode());
result = prime * result + ((confirmationWindowSize == null) ? 0 : confirmationWindowSize.hashCode());
result = prime * result + ((failoverOnInitialConnection == null) ? 0 : failoverOnInitialConnection.hashCode());
result = prime * result + ((producerMaxRate == null) ? 0 : producerMaxRate.hashCode());
result = prime * result + ((minLargeMessageSize == null) ? 0 : minLargeMessageSize.hashCode());
result = prime * result + ((blockOnAcknowledge == null) ? 0 : blockOnAcknowledge.hashCode());
result = prime * result + ((blockOnNonDurableSend == null) ? 0 : blockOnNonDurableSend.hashCode());
result = prime * result + ((blockOnDurableSend == null) ? 0 : blockOnDurableSend.hashCode());
result = prime * result + ((autoGroup == null) ? 0 : autoGroup.hashCode());
result = prime * result + ((preAcknowledge == null) ? 0 : preAcknowledge.hashCode());
result = prime * result + ((initialConnectAttempts == null) ? 0 : initialConnectAttempts.hashCode());
result = prime * result + ((retryInterval == null) ? 0 : retryInterval.hashCode());
result = prime * result + ((retryIntervalMultiplier == null) ? 0 : retryIntervalMultiplier.hashCode());
result = prime * result + ((maxRetryInterval == null) ? 0 : maxRetryInterval.hashCode());
result = prime * result + ((reconnectAttempts == null) ? 0 : reconnectAttempts.hashCode());
result = prime * result + ((useGlobalPools == null) ? 0 : useGlobalPools.hashCode());
result = prime * result + ((initialMessagePacketSize == null) ? 0 : initialMessagePacketSize.hashCode());
result = prime * result + ((scheduledThreadPoolMaxSize == null) ? 0 : scheduledThreadPoolMaxSize.hashCode());
result = prime * result + ((threadPoolMaxSize == null) ? 0 : threadPoolMaxSize.hashCode());
result = prime * result + ((groupID == null) ? 0 : groupID.hashCode());
result = prime * result + ((connectorClassName == null) ? 0 : connectorClassName.hashCode());
result = prime * result + ((connectionParameters == null) ? 0 : connectionParameters.hashCode());
return result;
}
} }

View File

@ -16,19 +16,31 @@
*/ */
package org.apache.activemq.artemis.tests.integration.ra; package org.apache.activemq.artemis.tests.integration.ra;
import javax.jms.QueueConnection;
import javax.jms.Session;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactory;
import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactoryImpl;
import org.apache.activemq.artemis.ra.ActiveMQRAConnectionManager;
import org.apache.activemq.artemis.ra.ActiveMQRAManagedConnection;
import org.apache.activemq.artemis.ra.ActiveMQRAManagedConnectionFactory;
import org.apache.activemq.artemis.ra.ActiveMQRASession;
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter; import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation; import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation;
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec; import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase { public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase {
/* /*
@ -107,4 +119,55 @@ public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase {
assertNull(secondaryServer.locateQueue(tempQueue)); assertNull(secondaryServer.locateQueue(tempQueue));
} }
@Test
public void testOutboundLoadBalancing() throws Exception {
final int CONNECTION_COUNT = 100;
ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
List<Session> sessions = new ArrayList<>();
List<ActiveMQRAManagedConnection> managedConnections = new ArrayList<>();
try {
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
ActiveMQRAConnectionManager qraConnectionManager = new ActiveMQRAConnectionManager();
ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory();
mcf.setResourceAdapter(qResourceAdapter);
ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager);
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
Session s = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
sessions.add(s);
ActiveMQRAManagedConnection mc = (ActiveMQRAManagedConnection) ((ActiveMQRASession) s).getManagedConnection();
managedConnections.add(mc);
ActiveMQConnectionFactory cf1 = mc.getConnectionFactory();
long timeout = 10000;
long now = System.currentTimeMillis();
while (!((ServerLocatorImpl)cf1.getServerLocator()).isReceivedToplogy()) {
Thread.sleep(50);
}
for (int i = 0; i < CONNECTION_COUNT; i++) {
queueConnection = qraConnectionFactory.createQueueConnection();
s = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
sessions.add(s);
mc = (ActiveMQRAManagedConnection) ((ActiveMQRASession) s).getManagedConnection();
managedConnections.add(mc);
}
assertTrue(server.getConnectionCount() >= (CONNECTION_COUNT / 2));
assertTrue(secondaryServer.getConnectionCount() >= (CONNECTION_COUNT / 2));
}
finally {
for (Session s : sessions) {
s.close();
}
for (ActiveMQRAManagedConnection mc : managedConnections) {
mc.destroy();
}
}
}
} }

View File

@ -42,9 +42,11 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactory; import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactory;
import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactoryImpl; import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactoryImpl;
import org.apache.activemq.artemis.ra.ActiveMQRAConnectionManager; import org.apache.activemq.artemis.ra.ActiveMQRAConnectionManager;
import org.apache.activemq.artemis.ra.ActiveMQRAManagedConnection;
import org.apache.activemq.artemis.ra.ActiveMQRAManagedConnectionFactory; import org.apache.activemq.artemis.ra.ActiveMQRAManagedConnectionFactory;
import org.apache.activemq.artemis.ra.ActiveMQRASession; import org.apache.activemq.artemis.ra.ActiveMQRASession;
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter; import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
@ -311,4 +313,54 @@ public class OutgoingConnectionTest extends ActiveMQRATestBase {
assertTrue(xaResourceWrapper.getProductVersion().equals(VersionLoader.getVersion().getFullVersion())); assertTrue(xaResourceWrapper.getProductVersion().equals(VersionLoader.getVersion().getFullVersion()));
assertTrue(xaResourceWrapper.getProductName().equals(ActiveMQResourceAdapter.PRODUCT_NAME)); assertTrue(xaResourceWrapper.getProductName().equals(ActiveMQResourceAdapter.PRODUCT_NAME));
} }
@Test
public void testSharedActiveMQConnectionFactory() throws Exception {
Session s = null;
Session s2 = null;
ActiveMQRAManagedConnection mc = null;
ActiveMQRAManagedConnection mc2 = null;
try {
resourceAdapter = new ActiveMQResourceAdapter();
resourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
MyBootstrapContext ctx = new MyBootstrapContext();
resourceAdapter.start(ctx);
ActiveMQRAConnectionManager qraConnectionManager = new ActiveMQRAConnectionManager();
ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory();
mcf.setResourceAdapter(resourceAdapter);
ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager);
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
s = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
mc = (ActiveMQRAManagedConnection) ((ActiveMQRASession) s).getManagedConnection();
ActiveMQConnectionFactory cf1 = mc.getConnectionFactory();
QueueConnection queueConnection2 = qraConnectionFactory.createQueueConnection();
s2 = queueConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
mc2 = (ActiveMQRAManagedConnection) ((ActiveMQRASession) s2).getManagedConnection();
ActiveMQConnectionFactory cf2 = mc2.getConnectionFactory();
// we're not testing equality so don't use equals(); we're testing if they are actually the *same* object
assertTrue(cf1 == cf2);
}
finally {
if (s != null) {
s.close();
}
if (mc != null) {
mc.destroy();
}
if (s2 != null) {
s2.close();
}
if (mc2 != null) {
mc2.destroy();
}
}
}
} }

View File

@ -17,10 +17,15 @@
package org.apache.activemq.artemis.tests.unit.ra; package org.apache.activemq.artemis.tests.unit.ra;
import java.beans.PropertyDescriptor; import java.beans.PropertyDescriptor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.activemq.artemis.ra.ConnectionFactoryProperties;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter; import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
@ -99,4 +104,85 @@ public class ConnectionFactoryPropertiesTest extends ActiveMQTestBase {
} }
return names; return names;
} }
@Test
public void testEquality() throws Exception {
ConnectionFactoryProperties cfp1 = new ConnectionFactoryProperties();
List<String> connectorClassNames1 = new ArrayList<String>();
connectorClassNames1.add("myConnector");
cfp1.setParsedConnectorClassNames(connectorClassNames1);
List<Map<String, Object>> connectionParameters1 = new ArrayList<Map<String, Object>>();
Map<String, Object> params1 = new HashMap<String, Object>();
params1.put("port", "0");
connectionParameters1.add(params1);
cfp1.setParsedConnectionParameters(connectionParameters1);
cfp1.setAutoGroup(true);
ConnectionFactoryProperties cfp2 = new ConnectionFactoryProperties();
List<String> connectorClassNames2 = new ArrayList<String>();
connectorClassNames2.add("myConnector");
cfp2.setParsedConnectorClassNames(connectorClassNames2);
List<Map<String, Object>> connectionParameters2 = new ArrayList<Map<String, Object>>();
Map<String, Object> params2 = new HashMap<String, Object>();
params2.put("port", "0");
connectionParameters2.add(params2);
cfp2.setParsedConnectionParameters(connectionParameters2);
cfp2.setAutoGroup(true);
assertTrue(cfp1.equals(cfp2));
}
@Test
public void testInequality() throws Exception {
ConnectionFactoryProperties cfp1 = new ConnectionFactoryProperties();
List<String> connectorClassNames1 = new ArrayList<String>();
connectorClassNames1.add("myConnector");
cfp1.setParsedConnectorClassNames(connectorClassNames1);
List<Map<String, Object>> connectionParameters1 = new ArrayList<Map<String, Object>>();
Map<String, Object> params1 = new HashMap<String, Object>();
params1.put("port", "0");
connectionParameters1.add(params1);
cfp1.setParsedConnectionParameters(connectionParameters1);
cfp1.setAutoGroup(true);
ConnectionFactoryProperties cfp2 = new ConnectionFactoryProperties();
List<String> connectorClassNames2 = new ArrayList<String>();
connectorClassNames2.add("myConnector");
cfp2.setParsedConnectorClassNames(connectorClassNames2);
List<Map<String, Object>> connectionParameters2 = new ArrayList<Map<String, Object>>();
Map<String, Object> params2 = new HashMap<String, Object>();
params2.put("port", "1");
connectionParameters2.add(params2);
cfp2.setParsedConnectionParameters(connectionParameters2);
cfp2.setAutoGroup(true);
assertFalse(cfp1.equals(cfp2));
}
@Test
public void testInequality2() throws Exception {
ConnectionFactoryProperties cfp1 = new ConnectionFactoryProperties();
List<String> connectorClassNames1 = new ArrayList<String>();
connectorClassNames1.add("myConnector");
cfp1.setParsedConnectorClassNames(connectorClassNames1);
List<Map<String, Object>> connectionParameters1 = new ArrayList<Map<String, Object>>();
Map<String, Object> params1 = new HashMap<String, Object>();
params1.put("port", "0");
connectionParameters1.add(params1);
cfp1.setParsedConnectionParameters(connectionParameters1);
cfp1.setAutoGroup(true);
ConnectionFactoryProperties cfp2 = new ConnectionFactoryProperties();
List<String> connectorClassNames2 = new ArrayList<String>();
connectorClassNames2.add("myConnector2");
cfp2.setParsedConnectorClassNames(connectorClassNames2);
List<Map<String, Object>> connectionParameters2 = new ArrayList<Map<String, Object>>();
Map<String, Object> params2 = new HashMap<String, Object>();
params2.put("port", "0");
connectionParameters2.add(params2);
cfp2.setParsedConnectionParameters(connectionParameters2);
cfp2.setAutoGroup(true);
assertFalse(cfp1.equals(cfp2));
}
} }