diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java index dc84ace6aa..9f628ed778 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java @@ -301,7 +301,7 @@ public final class ActiveMQRAManagedConnection implements ManagedConnection, Exc // we must close the ActiveMQConnectionFactory because it contains a ServerLocator if (connectionFactory != null) { - connectionFactory.close(); + ra.closeConnectionFactory(mcf.getProperties()); } } catch (Throwable e) { diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java index 24c659fcc1..b4370c9676 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -43,17 +44,18 @@ import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory; +import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; -import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.api.jms.JMSFactoryType; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation; import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec; import org.apache.activemq.artemis.ra.recovery.RecoveryManager; -import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.service.extensions.ServiceUtils; import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig; import org.apache.activemq.artemis.utils.SensitiveDataCodec; @@ -127,7 +129,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { * configured the exact same way. Using the same connection factory instance also makes connection load-balancing * behave as expected for outbound connections. */ - private final Map knownConnectionFactories = new HashMap(); + private final Map> knownConnectionFactories = new HashMap>(); /** * Constructor @@ -275,8 +277,8 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { managedConnectionFactories.clear(); - for (ActiveMQConnectionFactory knownConnectionFactory : knownConnectionFactories.values()) { - knownConnectionFactory.close(); + for (Pair pair : knownConnectionFactories.values()) { + pair.getA().close(); } knownConnectionFactories.clear(); @@ -1600,119 +1602,119 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { raProperties.setJgroupsChannelRefName(jgroupsChannelRefName); } - public ActiveMQConnectionFactory createActiveMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) { + public synchronized ActiveMQConnectionFactory createActiveMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) { ActiveMQConnectionFactory cf; boolean known = false; - synchronized (knownConnectionFactories) { - if (!knownConnectionFactories.keySet().contains(overrideProperties)) { - List connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames(); + if (!knownConnectionFactories.keySet().contains(overrideProperties)) { + List connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames(); - String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress(); + String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress(); - Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA(); + Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA(); - String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile(); + String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile(); - String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName(); + String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName(); - String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass(); + String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass(); - if (ha == null) { - ha = ActiveMQClient.DEFAULT_IS_HA; + if (ha == null) { + ha = ActiveMQClient.DEFAULT_IS_HA; + } + + if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) { + BroadcastEndpointFactory endpointFactory = null; + + if (jgroupsLocatorClassName != null) { + String jchannelRefName = raProperties.getJgroupsChannelRefName(); + JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName); + endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel); + } + else if (discoveryAddress != null) { + Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort(); + if (discoveryPort == null) { + discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT; + } + + String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress(); + endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1); + } + else if (jgroupsFileName != null) { + endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName); + } + Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout(); + if (refreshTimeout == null) { + refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT; } - if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) { - BroadcastEndpointFactory endpointFactory = null; + Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout(); - if (jgroupsLocatorClassName != null) { - String jchannelRefName = raProperties.getJgroupsChannelRefName(); - JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName); - endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel); - } - else if (discoveryAddress != null) { - Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort(); - if (discoveryPort == null) { - discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT; - } - - String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress(); - endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1); - } - else if (jgroupsFileName != null) { - endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName); - } - Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout(); - if (refreshTimeout == null) { - refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT; - } - - Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout(); - - if (initialTimeout == null) { - initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT; - } - - DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory); - - if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { - ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha); - } - - if (ha) { - cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF); - } - else { - cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF); - } + if (initialTimeout == null) { + initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT; } - else if (connectorClassName != null) { - TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()]; - List> connectionParams; - if (overrideProperties.getParsedConnectorClassNames() != null) { - connectionParams = overrideProperties.getParsedConnectionParameters(); - } - else { - connectionParams = raProperties.getParsedConnectionParameters(); - } + DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory); - for (int i = 0; i < connectorClassName.size(); i++) { - TransportConfiguration tc; - if (connectionParams == null || i >= connectionParams.size()) { - tc = new TransportConfiguration(connectorClassName.get(i)); - ActiveMQRALogger.LOGGER.debug("No connector params provided using default"); - } - else { - tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i)); - } + if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { + ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha); + } - transportConfigurations[i] = tc; - } - - if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { - ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" + - Arrays.toString(transportConfigurations) + " with ha=" + ha); - } - - if (ha) { - cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations); - } - else { - cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations); - } + if (ha) { + cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF); } else { - throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory"); + cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF); + } + } + else if (connectorClassName != null) { + TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()]; + + List> connectionParams; + if (overrideProperties.getParsedConnectorClassNames() != null) { + connectionParams = overrideProperties.getParsedConnectionParameters(); + } + else { + connectionParams = raProperties.getParsedConnectionParameters(); } - setParams(cf, overrideProperties); - knownConnectionFactories.put(overrideProperties, cf); + for (int i = 0; i < connectorClassName.size(); i++) { + TransportConfiguration tc; + if (connectionParams == null || i >= connectionParams.size()) { + tc = new TransportConfiguration(connectorClassName.get(i)); + ActiveMQRALogger.LOGGER.debug("No connector params provided using default"); + } + else { + tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i)); + } + + transportConfigurations[i] = tc; + } + + if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { + ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" + + Arrays.toString(transportConfigurations) + " with ha=" + ha); + } + + if (ha) { + cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations); + } + else { + cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations); + } } else { - cf = knownConnectionFactories.get(overrideProperties); - known = true; + throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory"); } + + setParams(cf, overrideProperties); + knownConnectionFactories.put(overrideProperties, new Pair(cf, new AtomicInteger(1))); + } + else { + Pair pair = knownConnectionFactories.get(overrideProperties); + cf = pair.getA(); + pair.getB().incrementAndGet(); + known = true; } if (known && cf.getServerLocator().isClosed()) { @@ -1978,4 +1980,12 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { public SensitiveDataCodec getCodecInstance() { return raProperties.getCodecInstance(); } + + public synchronized void closeConnectionFactory(ConnectionFactoryProperties properties) { + Pair pair = knownConnectionFactories.get(properties); + int references = pair.getB().decrementAndGet(); + if (pair.getA() != null && pair.getA() != defaultActiveMQConnectionFactory && references == 0) { + knownConnectionFactories.remove(properties).getA().close(); + } + } } diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java index 52c05df196..8e55061c14 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java @@ -427,7 +427,7 @@ public class ActiveMQActivation { } if (spec.isHasBeenUpdated() && factory != null) { - factory.close(); + ra.closeConnectionFactory(spec); factory = null; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java index db7f2cd7d9..a1cf442d84 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java @@ -363,4 +363,55 @@ public class OutgoingConnectionTest extends ActiveMQRATestBase { } } } + + @Test + public void testSharedActiveMQConnectionFactoryWithClose() throws Exception { + Session s = null; + Session s2 = null; + ActiveMQRAManagedConnection mc = null; + ActiveMQRAManagedConnection mc2 = null; + + try { + server.getConfiguration().setSecurityEnabled(false); + resourceAdapter = new ActiveMQResourceAdapter(); + + resourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName()); + MyBootstrapContext ctx = new MyBootstrapContext(); + resourceAdapter.start(ctx); + ActiveMQRAConnectionManager qraConnectionManager = new ActiveMQRAConnectionManager(); + ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory(); + mcf.setResourceAdapter(resourceAdapter); + ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager); + + QueueConnection queueConnection = qraConnectionFactory.createQueueConnection(); + s = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + mc = (ActiveMQRAManagedConnection) ((ActiveMQRASession) s).getManagedConnection(); + + QueueConnection queueConnection2 = qraConnectionFactory.createQueueConnection(); + s2 = queueConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + mc2 = (ActiveMQRAManagedConnection) ((ActiveMQRASession) s2).getManagedConnection(); + + mc.destroy(); + + MessageProducer producer = s2.createProducer(ActiveMQJMSClient.createQueue(MDBQUEUE)); + producer.send(s2.createTextMessage("x")); + } + finally { + if (s != null) { + s.close(); + } + + if (mc != null) { + mc.destroy(); + } + + if (s2 != null) { + s2.close(); + } + + if (mc2 != null) { + mc2.destroy(); + } + } + } }