AMQ-5790 - rework the fix from AMQ-4486 to tie the inbound xaResource connection lifectyle to the rar start/stop

This commit is contained in:
gtully 2019-02-26 12:26:18 +00:00
parent 7c872afa22
commit 5bd2abf85d
4 changed files with 31 additions and 136 deletions

View File

@ -271,7 +271,7 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser
@Override
public String toString() {
return new StringBuffer("ActiveMQConnectionRequestInfo{ userName = '").append(userName).append("' ").append(", serverUrl = '").append(serverUrl)
.append("' ").append(", clientid = '").append(clientid).append("' ").append(", userName = '").append(userName).append("' ")
.append("' ").append(", clientid = '").append(clientid).append("' ")
.append(", useSessionArgs = '").append(useSessionArgs).append("' ").append(", useInboundSession = '").append(useInboundSession).append("' }")
.toString();
}

View File

@ -18,7 +18,9 @@ package org.apache.activemq.ra;
import java.io.Serializable;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import javax.resource.NotSupportedException;
@ -54,12 +56,13 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
private static final long serialVersionUID = 360805587169336959L;
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQResourceAdapter.class);
private transient final HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> endpointWorkers = new HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker>();
private final AtomicBoolean started = new AtomicBoolean(false);
private transient BootstrapContext bootstrapContext;
private String brokerXmlConfig;
private transient BrokerService broker;
private transient Thread brokerStartThread;
private ActiveMQConnectionFactory connectionFactory;
private transient TransactionContext xaRecoveryTransactionContext;
/**
*
@ -73,6 +76,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
*/
@Override
public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
log.debug("Start: " + this.getInfo());
this.bootstrapContext = bootstrapContext;
if (brokerXmlConfig != null && brokerXmlConfig.trim().length() > 0) {
brokerStartThread = new Thread("Starting ActiveMQ Broker") {
@ -108,6 +112,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
Thread.currentThread().interrupt();
}
}
started.compareAndSet(false, true);
}
public ActiveMQConnection makeConnection() throws JMSException {
@ -152,6 +157,8 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
*/
@Override
public void stop() {
log.debug("Stop: " + this.getInfo());
started.compareAndSet(true, false);
synchronized (endpointWorkers) {
while (endpointWorkers.size() > 0) {
ActiveMQEndpointActivationKey key = endpointWorkers.keySet().iterator().next();
@ -167,9 +174,15 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
ServiceSupport.dispose(broker);
broker = null;
}
if (xaRecoveryTransactionContext != null) {
try {
xaRecoveryTransactionContext.getConnection().close();
} catch (Throwable ignored) {}
}
}
this.bootstrapContext = null;
this.xaRecoveryTransactionContext = null;
}
/**
@ -249,138 +262,19 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
*/
@Override
public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
LOG.debug("getXAResources: activationSpecs" + (activationSpecs != null ? Arrays.asList(activationSpecs) : "[]") + ", info: " + getInfo());
if (!started.get()) {
LOG.debug("RAR[" + this.getInfo() + "] stopped or undeployed; no connection available for xa recovery");
return new XAResource[]{};
}
try {
return new XAResource[]{
new TransactionContext() {
@Override
public boolean isSameRM(XAResource xaresource) throws XAException {
ActiveMQConnection original = null;
try {
original = setConnection(newConnection());
boolean result = super.isSameRM(xaresource);
LOG.trace("{}.recover({})={}", getConnection(), xaresource, result);
return result;
} catch (JMSException e) {
LOG.trace("isSameRM({}) failed", xaresource, e);
XAException xaException = new XAException(e.getMessage());
throw xaException;
} finally {
closeConnection(original);
}
}
@Override
protected String getResourceManagerId() throws JMSException {
ActiveMQConnection original = null;
try {
original = setConnection(newConnection());
return super.getResourceManagerId();
} finally {
closeConnection(original);
}
}
@Override
public void commit(Xid xid, boolean onePhase) throws XAException {
ActiveMQConnection original = null;
try {
setConnection(newConnection());
super.commit(xid, onePhase);
LOG.trace("{}.commit({},{})", getConnection(), xid);
} catch (JMSException e) {
LOG.trace("{}.commit({},{}) failed", getConnection(), xid, onePhase, e);
throwXAException(e);
} finally {
closeConnection(original);
}
}
@Override
public void rollback(Xid xid) throws XAException {
ActiveMQConnection original = null;
try {
original = setConnection(newConnection());
super.rollback(xid);
LOG.trace("{}.rollback({})", getConnection(), xid);
} catch (JMSException e) {
LOG.trace("{}.rollback({}) failed", getConnection(), xid, e);
throwXAException(e);
} finally {
closeConnection(original);
}
}
@Override
public Xid[] recover(int flags) throws XAException {
Xid[] result = new Xid[]{};
ActiveMQConnection original = null;
try {
original = setConnection(newConnection());
result = super.recover(flags);
LOG.trace("{}.recover({})={}", getConnection(), flags, result);
} catch (JMSException e) {
LOG.trace("{}.recover({}) failed", getConnection(), flags, e);
throwXAException(e);
} finally {
closeConnection(original);
}
return result;
}
@Override
public void forget(Xid xid) throws XAException {
ActiveMQConnection original = null;
try {
original = setConnection(newConnection());
super.forget(xid);
LOG.trace("{}.forget({})", getConnection(), xid);
} catch (JMSException e) {
LOG.trace("{}.forget({}) failed", getConnection(), xid, e);
throwXAException(e);
} finally {
closeConnection(original);
}
}
private void throwXAException(JMSException e) throws XAException {
XAException xaException = new XAException(e.getMessage());
xaException.errorCode = XAException.XAER_RMFAIL;
throw xaException;
}
private ActiveMQConnection newConnection() throws JMSException {
ActiveMQConnection connection = null;
try {
connection = makeConnection();
connection.start();
} catch (JMSException ex) {
if (connection != null) {
try {
connection.close();
} catch (JMSException ignore) { }
}
throw ex;
}
return connection;
}
private void closeConnection(ActiveMQConnection original) {
ActiveMQConnection connection = getConnection();
if (connection != null) {
try {
connection.close();
} catch (JMSException ignored) {}
}
setConnection(original);
}
}};
synchronized ( this ) {
if (xaRecoveryTransactionContext == null) {
LOG.debug("Init XAResource with: " + this.getInfo());
xaRecoveryTransactionContext = new TransactionContext(makeConnection());
}
}
return new XAResource[]{ xaRecoveryTransactionContext };
} catch (Exception e) {
throw new ResourceException(e);
}

View File

@ -96,12 +96,14 @@ public class ActiveMQConnectionExecutorThreadCleanUpTest {
// connection error.
for (int i=0; i<10; i++) {
LOG.debug("Iteration " + i);
ra.start(null);
try {
XAResource[] resources = ra.getXAResources(null);
resources[0].recover(100);
} catch (Exception ex) {
LOG.error(ex.getMessage());
}
ra.stop();
// allow some small time for thread cleanup to happen
Thread.sleep(300);
@ -111,7 +113,6 @@ public class ActiveMQConnectionExecutorThreadCleanUpTest {
"\" not cleared up with ActiveMQConnection.",
hasActiveMQConnectionExceutorThread());
}
ra.stop();
}

View File

@ -17,7 +17,6 @@
package org.apache.activemq.ra;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@ -101,6 +100,7 @@ public class ActiveMQConnectionFactoryTest {
@Test(timeout = 60000)
public void testGetXAResource() throws Exception {
ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
ra.start(null);
ra.setServerUrl(url);
ra.setUserName(user);
ra.setPassword(pwd);
@ -114,6 +114,6 @@ public class ActiveMQConnectionFactoryTest {
XAResource[] resource2 = ra.getXAResources(null);
assertEquals("one resource", 1, resource2.length);
assertTrue("isSameRM true", resources[0].isSameRM(resource2[0]));
assertFalse("no tthe same instance", resources[0].equals(resource2[0]));
assertTrue("the same instance", resources[0].equals(resource2[0]));
}
}