mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-4486 - rework to open/close connection arround each xaresource op so there are no leaks during periodic recovery
This commit is contained in:
parent
d8cd37030b
commit
3826a23ed5
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.ra;
|
package org.apache.activemq.ra;
|
||||||
|
|
||||||
|
import java.lang.reflect.Method;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
|
||||||
|
@ -102,9 +103,6 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @see org.apache.activemq.ra.MessageResourceAdapter#makeConnection()
|
|
||||||
*/
|
|
||||||
public ActiveMQConnection makeConnection() throws JMSException {
|
public ActiveMQConnection makeConnection() throws JMSException {
|
||||||
if( connectionFactory == null ) {
|
if( connectionFactory == null ) {
|
||||||
return makeConnection(getInfo());
|
return makeConnection(getInfo());
|
||||||
|
@ -235,18 +233,23 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
|
||||||
*/
|
*/
|
||||||
public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
|
public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
|
||||||
try {
|
try {
|
||||||
final ActiveMQConnection connection = makeConnection();
|
return new XAResource[]{(XAResource)
|
||||||
return new XAResource[]{new LocalAndXATransaction(new TransactionContext(connection)) {
|
java.lang.reflect.Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{XAResource.class},
|
||||||
public void finalize() throws Throwable {
|
new java.lang.reflect.InvocationHandler () {
|
||||||
|
@Override
|
||||||
|
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
|
||||||
|
ActiveMQConnection connection = makeConnection();
|
||||||
|
try {
|
||||||
|
return method.invoke(new TransactionContext(connection), args);
|
||||||
|
} finally {
|
||||||
try {
|
try {
|
||||||
connection.close();
|
connection.close();
|
||||||
} catch (Throwable ignore) {
|
} catch (Throwable ignore) {}
|
||||||
} finally {
|
|
||||||
super.finalize();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}};
|
})};
|
||||||
} catch (JMSException e) {
|
|
||||||
|
} catch (Exception e) {
|
||||||
throw new ResourceException(e);
|
throw new ResourceException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -113,9 +113,9 @@ public class ActiveMQConnectionFactoryTest extends TestCase {
|
||||||
ra.setUserName(user);
|
ra.setUserName(user);
|
||||||
ra.setPassword(pwd);
|
ra.setPassword(pwd);
|
||||||
|
|
||||||
XAResource[] resoruces = ra.getXAResources(null);
|
XAResource[] resources = ra.getXAResources(null);
|
||||||
assertEquals("one resource", 1, resoruces.length);
|
assertEquals("one resource", 1, resources.length);
|
||||||
|
|
||||||
assertEquals("no pending transactions", 0, resoruces[0].recover(100).length);
|
assertEquals("no pending transactions", 0, resources[0].recover(100).length);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue