ARTEMIS-210 count references

It is possible for the closure of one resource to potentially impact
another since they are now sharing the same ServerLocator instance.
Keep track of references to avoid this.
This commit is contained in:
jbertram 2015-09-14 11:18:31 -05:00
parent cbb6c63d00
commit 2eac97aaff
4 changed files with 158 additions and 97 deletions

View File

@ -301,7 +301,7 @@ public final class ActiveMQRAManagedConnection implements ManagedConnection, Exc
// we must close the ActiveMQConnectionFactory because it contains a ServerLocator
if (connectionFactory != null) {
connectionFactory.close();
ra.closeConnectionFactory(mcf.getProperties());
}
}
catch (Throwable e) {

View File

@ -35,6 +35,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -43,17 +44,18 @@ import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation;
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
import org.apache.activemq.artemis.ra.recovery.RecoveryManager;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.service.extensions.ServiceUtils;
import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
import org.apache.activemq.artemis.utils.SensitiveDataCodec;
@ -127,7 +129,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
* configured the exact same way. Using the same connection factory instance also makes connection load-balancing
* behave as expected for outbound connections.
*/
private final Map<ConnectionFactoryProperties, ActiveMQConnectionFactory> knownConnectionFactories = new HashMap<ConnectionFactoryProperties, ActiveMQConnectionFactory>();
private final Map<ConnectionFactoryProperties, Pair<ActiveMQConnectionFactory, AtomicInteger>> knownConnectionFactories = new HashMap<ConnectionFactoryProperties, Pair<ActiveMQConnectionFactory, AtomicInteger>>();
/**
* Constructor
@ -275,8 +277,8 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
managedConnectionFactories.clear();
for (ActiveMQConnectionFactory knownConnectionFactory : knownConnectionFactories.values()) {
knownConnectionFactory.close();
for (Pair<ActiveMQConnectionFactory, AtomicInteger> pair : knownConnectionFactories.values()) {
pair.getA().close();
}
knownConnectionFactories.clear();
@ -1600,119 +1602,119 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
raProperties.setJgroupsChannelRefName(jgroupsChannelRefName);
}
public ActiveMQConnectionFactory createActiveMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) {
public synchronized ActiveMQConnectionFactory createActiveMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) {
ActiveMQConnectionFactory cf;
boolean known = false;
synchronized (knownConnectionFactories) {
if (!knownConnectionFactories.keySet().contains(overrideProperties)) {
List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames();
if (!knownConnectionFactories.keySet().contains(overrideProperties)) {
List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames();
String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress();
String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress();
Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA();
Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA();
String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile();
String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile();
String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName();
String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName();
String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass();
String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass();
if (ha == null) {
ha = ActiveMQClient.DEFAULT_IS_HA;
if (ha == null) {
ha = ActiveMQClient.DEFAULT_IS_HA;
}
if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) {
BroadcastEndpointFactory endpointFactory = null;
if (jgroupsLocatorClassName != null) {
String jchannelRefName = raProperties.getJgroupsChannelRefName();
JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName);
endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel);
}
else if (discoveryAddress != null) {
Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort();
if (discoveryPort == null) {
discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT;
}
String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress();
endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1);
}
else if (jgroupsFileName != null) {
endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName);
}
Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout();
if (refreshTimeout == null) {
refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
}
if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) {
BroadcastEndpointFactory endpointFactory = null;
Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout();
if (jgroupsLocatorClassName != null) {
String jchannelRefName = raProperties.getJgroupsChannelRefName();
JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName);
endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel);
}
else if (discoveryAddress != null) {
Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort();
if (discoveryPort == null) {
discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT;
}
String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress();
endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1);
}
else if (jgroupsFileName != null) {
endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName);
}
Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout();
if (refreshTimeout == null) {
refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
}
Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout();
if (initialTimeout == null) {
initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
}
DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory);
if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha);
}
if (ha) {
cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF);
}
else {
cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
}
if (initialTimeout == null) {
initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
}
else if (connectorClassName != null) {
TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()];
List<Map<String, Object>> connectionParams;
if (overrideProperties.getParsedConnectorClassNames() != null) {
connectionParams = overrideProperties.getParsedConnectionParameters();
}
else {
connectionParams = raProperties.getParsedConnectionParameters();
}
DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory);
for (int i = 0; i < connectorClassName.size(); i++) {
TransportConfiguration tc;
if (connectionParams == null || i >= connectionParams.size()) {
tc = new TransportConfiguration(connectorClassName.get(i));
ActiveMQRALogger.LOGGER.debug("No connector params provided using default");
}
else {
tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i));
}
if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha);
}
transportConfigurations[i] = tc;
}
if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" +
Arrays.toString(transportConfigurations) + " with ha=" + ha);
}
if (ha) {
cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations);
}
else {
cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations);
}
if (ha) {
cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF);
}
else {
throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory");
cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
}
}
else if (connectorClassName != null) {
TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()];
List<Map<String, Object>> connectionParams;
if (overrideProperties.getParsedConnectorClassNames() != null) {
connectionParams = overrideProperties.getParsedConnectionParameters();
}
else {
connectionParams = raProperties.getParsedConnectionParameters();
}
setParams(cf, overrideProperties);
knownConnectionFactories.put(overrideProperties, cf);
for (int i = 0; i < connectorClassName.size(); i++) {
TransportConfiguration tc;
if (connectionParams == null || i >= connectionParams.size()) {
tc = new TransportConfiguration(connectorClassName.get(i));
ActiveMQRALogger.LOGGER.debug("No connector params provided using default");
}
else {
tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i));
}
transportConfigurations[i] = tc;
}
if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" +
Arrays.toString(transportConfigurations) + " with ha=" + ha);
}
if (ha) {
cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations);
}
else {
cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations);
}
}
else {
cf = knownConnectionFactories.get(overrideProperties);
known = true;
throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory");
}
setParams(cf, overrideProperties);
knownConnectionFactories.put(overrideProperties, new Pair(cf, new AtomicInteger(1)));
}
else {
Pair<ActiveMQConnectionFactory, AtomicInteger> pair = knownConnectionFactories.get(overrideProperties);
cf = pair.getA();
pair.getB().incrementAndGet();
known = true;
}
if (known && cf.getServerLocator().isClosed()) {
@ -1978,4 +1980,12 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
public SensitiveDataCodec<String> getCodecInstance() {
return raProperties.getCodecInstance();
}
public synchronized void closeConnectionFactory(ConnectionFactoryProperties properties) {
Pair<ActiveMQConnectionFactory, AtomicInteger> pair = knownConnectionFactories.get(properties);
int references = pair.getB().decrementAndGet();
if (pair.getA() != null && pair.getA() != defaultActiveMQConnectionFactory && references == 0) {
knownConnectionFactories.remove(properties).getA().close();
}
}
}

View File

@ -427,7 +427,7 @@ public class ActiveMQActivation {
}
if (spec.isHasBeenUpdated() && factory != null) {
factory.close();
ra.closeConnectionFactory(spec);
factory = null;
}

View File

@ -363,4 +363,55 @@ public class OutgoingConnectionTest extends ActiveMQRATestBase {
}
}
}
@Test
public void testSharedActiveMQConnectionFactoryWithClose() throws Exception {
Session s = null;
Session s2 = null;
ActiveMQRAManagedConnection mc = null;
ActiveMQRAManagedConnection mc2 = null;
try {
server.getConfiguration().setSecurityEnabled(false);
resourceAdapter = new ActiveMQResourceAdapter();
resourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
MyBootstrapContext ctx = new MyBootstrapContext();
resourceAdapter.start(ctx);
ActiveMQRAConnectionManager qraConnectionManager = new ActiveMQRAConnectionManager();
ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory();
mcf.setResourceAdapter(resourceAdapter);
ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager);
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
s = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
mc = (ActiveMQRAManagedConnection) ((ActiveMQRASession) s).getManagedConnection();
QueueConnection queueConnection2 = qraConnectionFactory.createQueueConnection();
s2 = queueConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
mc2 = (ActiveMQRAManagedConnection) ((ActiveMQRASession) s2).getManagedConnection();
mc.destroy();
MessageProducer producer = s2.createProducer(ActiveMQJMSClient.createQueue(MDBQUEUE));
producer.send(s2.createTextMessage("x"));
}
finally {
if (s != null) {
s.close();
}
if (mc != null) {
mc.destroy();
}
if (s2 != null) {
s2.close();
}
if (mc2 != null) {
mc2.destroy();
}
}
}
}