This closes #169 ref counts on RA

This commit is contained in:
Clebert Suconic 2015-09-14 20:58:06 -04:00
commit adc536ce06
4 changed files with 158 additions and 97 deletions

View File

@ -301,7 +301,7 @@ public final class ActiveMQRAManagedConnection implements ManagedConnection, Exc
// we must close the ActiveMQConnectionFactory because it contains a ServerLocator // we must close the ActiveMQConnectionFactory because it contains a ServerLocator
if (connectionFactory != null) { if (connectionFactory != null) {
connectionFactory.close(); ra.closeConnectionFactory(mcf.getProperties());
} }
} }
catch (Throwable e) { catch (Throwable e) {

View File

@ -35,6 +35,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -43,17 +44,18 @@ import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType; import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation; import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation;
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec; import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
import org.apache.activemq.artemis.ra.recovery.RecoveryManager; import org.apache.activemq.artemis.ra.recovery.RecoveryManager;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.service.extensions.ServiceUtils; import org.apache.activemq.artemis.service.extensions.ServiceUtils;
import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig; import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
import org.apache.activemq.artemis.utils.SensitiveDataCodec; import org.apache.activemq.artemis.utils.SensitiveDataCodec;
@ -127,7 +129,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
* configured the exact same way. Using the same connection factory instance also makes connection load-balancing * configured the exact same way. Using the same connection factory instance also makes connection load-balancing
* behave as expected for outbound connections. * behave as expected for outbound connections.
*/ */
private final Map<ConnectionFactoryProperties, ActiveMQConnectionFactory> knownConnectionFactories = new HashMap<ConnectionFactoryProperties, ActiveMQConnectionFactory>(); private final Map<ConnectionFactoryProperties, Pair<ActiveMQConnectionFactory, AtomicInteger>> knownConnectionFactories = new HashMap<ConnectionFactoryProperties, Pair<ActiveMQConnectionFactory, AtomicInteger>>();
/** /**
* Constructor * Constructor
@ -275,8 +277,8 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
managedConnectionFactories.clear(); managedConnectionFactories.clear();
for (ActiveMQConnectionFactory knownConnectionFactory : knownConnectionFactories.values()) { for (Pair<ActiveMQConnectionFactory, AtomicInteger> pair : knownConnectionFactories.values()) {
knownConnectionFactory.close(); pair.getA().close();
} }
knownConnectionFactories.clear(); knownConnectionFactories.clear();
@ -1600,11 +1602,10 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
raProperties.setJgroupsChannelRefName(jgroupsChannelRefName); raProperties.setJgroupsChannelRefName(jgroupsChannelRefName);
} }
public ActiveMQConnectionFactory createActiveMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) { public synchronized ActiveMQConnectionFactory createActiveMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) {
ActiveMQConnectionFactory cf; ActiveMQConnectionFactory cf;
boolean known = false; boolean known = false;
synchronized (knownConnectionFactories) {
if (!knownConnectionFactories.keySet().contains(overrideProperties)) { if (!knownConnectionFactories.keySet().contains(overrideProperties)) {
List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames(); List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames();
@ -1707,13 +1708,14 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
} }
setParams(cf, overrideProperties); setParams(cf, overrideProperties);
knownConnectionFactories.put(overrideProperties, cf); knownConnectionFactories.put(overrideProperties, new Pair(cf, new AtomicInteger(1)));
} }
else { else {
cf = knownConnectionFactories.get(overrideProperties); Pair<ActiveMQConnectionFactory, AtomicInteger> pair = knownConnectionFactories.get(overrideProperties);
cf = pair.getA();
pair.getB().incrementAndGet();
known = true; known = true;
} }
}
if (known && cf.getServerLocator().isClosed()) { if (known && cf.getServerLocator().isClosed()) {
knownConnectionFactories.remove(overrideProperties); knownConnectionFactories.remove(overrideProperties);
@ -1978,4 +1980,12 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
public SensitiveDataCodec<String> getCodecInstance() { public SensitiveDataCodec<String> getCodecInstance() {
return raProperties.getCodecInstance(); return raProperties.getCodecInstance();
} }
public synchronized void closeConnectionFactory(ConnectionFactoryProperties properties) {
Pair<ActiveMQConnectionFactory, AtomicInteger> pair = knownConnectionFactories.get(properties);
int references = pair.getB().decrementAndGet();
if (pair.getA() != null && pair.getA() != defaultActiveMQConnectionFactory && references == 0) {
knownConnectionFactories.remove(properties).getA().close();
}
}
} }

View File

@ -427,7 +427,7 @@ public class ActiveMQActivation {
} }
if (spec.isHasBeenUpdated() && factory != null) { if (spec.isHasBeenUpdated() && factory != null) {
factory.close(); ra.closeConnectionFactory(spec);
factory = null; factory = null;
} }

View File

@ -363,4 +363,55 @@ public class OutgoingConnectionTest extends ActiveMQRATestBase {
} }
} }
} }
@Test
public void testSharedActiveMQConnectionFactoryWithClose() throws Exception {
Session s = null;
Session s2 = null;
ActiveMQRAManagedConnection mc = null;
ActiveMQRAManagedConnection mc2 = null;
try {
server.getConfiguration().setSecurityEnabled(false);
resourceAdapter = new ActiveMQResourceAdapter();
resourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
MyBootstrapContext ctx = new MyBootstrapContext();
resourceAdapter.start(ctx);
ActiveMQRAConnectionManager qraConnectionManager = new ActiveMQRAConnectionManager();
ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory();
mcf.setResourceAdapter(resourceAdapter);
ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager);
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
s = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
mc = (ActiveMQRAManagedConnection) ((ActiveMQRASession) s).getManagedConnection();
QueueConnection queueConnection2 = qraConnectionFactory.createQueueConnection();
s2 = queueConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
mc2 = (ActiveMQRAManagedConnection) ((ActiveMQRASession) s2).getManagedConnection();
mc.destroy();
MessageProducer producer = s2.createProducer(ActiveMQJMSClient.createQueue(MDBQUEUE));
producer.send(s2.createTextMessage("x"));
}
finally {
if (s != null) {
s.close();
}
if (mc != null) {
mc.destroy();
}
if (s2 != null) {
s2.close();
}
if (mc2 != null) {
mc2.destroy();
}
}
}
} }