From 5bd2abf85dbda14bf41f54d09af4866e814f931f Mon Sep 17 00:00:00 2001 From: gtully Date: Tue, 26 Feb 2019 12:26:18 +0000 Subject: [PATCH] AMQ-5790 - rework the fix from AMQ-4486 to tie the inbound xaResource connection lifectyle to the rar start/stop --- .../ra/ActiveMQConnectionRequestInfo.java | 2 +- .../activemq/ra/ActiveMQResourceAdapter.java | 158 +++--------------- ...MQConnectionExecutorThreadCleanUpTest.java | 3 +- .../ra/ActiveMQConnectionFactoryTest.java | 4 +- 4 files changed, 31 insertions(+), 136 deletions(-) diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java index 7ed3f26319..5086b415c1 100644 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java @@ -271,7 +271,7 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser @Override public String toString() { return new StringBuffer("ActiveMQConnectionRequestInfo{ userName = '").append(userName).append("' ").append(", serverUrl = '").append(serverUrl) - .append("' ").append(", clientid = '").append(clientid).append("' ").append(", userName = '").append(userName).append("' ") + .append("' ").append(", clientid = '").append(clientid).append("' ") .append(", useSessionArgs = '").append(useSessionArgs).append("' ").append(", useInboundSession = '").append(useInboundSession).append("' }") .toString(); } diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java index b17674e88a..8d7dd6fe74 100644 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java @@ -18,7 +18,9 @@ package org.apache.activemq.ra; import java.io.Serializable; import java.net.URI; +import java.util.Arrays; import java.util.HashMap; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.JMSException; import javax.resource.NotSupportedException; @@ -54,12 +56,13 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement private static final long serialVersionUID = 360805587169336959L; private static final Logger LOG = LoggerFactory.getLogger(ActiveMQResourceAdapter.class); private transient final HashMap endpointWorkers = new HashMap(); - + private final AtomicBoolean started = new AtomicBoolean(false); private transient BootstrapContext bootstrapContext; private String brokerXmlConfig; private transient BrokerService broker; private transient Thread brokerStartThread; private ActiveMQConnectionFactory connectionFactory; + private transient TransactionContext xaRecoveryTransactionContext; /** * @@ -73,6 +76,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement */ @Override public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException { + log.debug("Start: " + this.getInfo()); this.bootstrapContext = bootstrapContext; if (brokerXmlConfig != null && brokerXmlConfig.trim().length() > 0) { brokerStartThread = new Thread("Starting ActiveMQ Broker") { @@ -108,6 +112,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement Thread.currentThread().interrupt(); } } + started.compareAndSet(false, true); } public ActiveMQConnection makeConnection() throws JMSException { @@ -152,6 +157,8 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement */ @Override public void stop() { + log.debug("Stop: " + this.getInfo()); + started.compareAndSet(true, false); synchronized (endpointWorkers) { while (endpointWorkers.size() > 0) { ActiveMQEndpointActivationKey key = endpointWorkers.keySet().iterator().next(); @@ -167,9 +174,15 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement ServiceSupport.dispose(broker); broker = null; } + if (xaRecoveryTransactionContext != null) { + try { + xaRecoveryTransactionContext.getConnection().close(); + } catch (Throwable ignored) {} + } } this.bootstrapContext = null; + this.xaRecoveryTransactionContext = null; } /** @@ -249,138 +262,19 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement */ @Override public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException { + LOG.debug("getXAResources: activationSpecs" + (activationSpecs != null ? Arrays.asList(activationSpecs) : "[]") + ", info: " + getInfo()); + if (!started.get()) { + LOG.debug("RAR[" + this.getInfo() + "] stopped or undeployed; no connection available for xa recovery"); + return new XAResource[]{}; + } try { - return new XAResource[]{ - new TransactionContext() { - - @Override - public boolean isSameRM(XAResource xaresource) throws XAException { - ActiveMQConnection original = null; - try { - original = setConnection(newConnection()); - boolean result = super.isSameRM(xaresource); - LOG.trace("{}.recover({})={}", getConnection(), xaresource, result); - return result; - - } catch (JMSException e) { - LOG.trace("isSameRM({}) failed", xaresource, e); - XAException xaException = new XAException(e.getMessage()); - throw xaException; - } finally { - closeConnection(original); - } - } - - @Override - protected String getResourceManagerId() throws JMSException { - ActiveMQConnection original = null; - try { - original = setConnection(newConnection()); - return super.getResourceManagerId(); - } finally { - closeConnection(original); - } - } - - @Override - public void commit(Xid xid, boolean onePhase) throws XAException { - ActiveMQConnection original = null; - try { - setConnection(newConnection()); - super.commit(xid, onePhase); - LOG.trace("{}.commit({},{})", getConnection(), xid); - - } catch (JMSException e) { - LOG.trace("{}.commit({},{}) failed", getConnection(), xid, onePhase, e); - throwXAException(e); - } finally { - closeConnection(original); - } - } - - @Override - public void rollback(Xid xid) throws XAException { - ActiveMQConnection original = null; - try { - original = setConnection(newConnection()); - super.rollback(xid); - LOG.trace("{}.rollback({})", getConnection(), xid); - - } catch (JMSException e) { - LOG.trace("{}.rollback({}) failed", getConnection(), xid, e); - throwXAException(e); - } finally { - closeConnection(original); - } - } - - @Override - public Xid[] recover(int flags) throws XAException { - Xid[] result = new Xid[]{}; - ActiveMQConnection original = null; - try { - original = setConnection(newConnection()); - result = super.recover(flags); - LOG.trace("{}.recover({})={}", getConnection(), flags, result); - - } catch (JMSException e) { - LOG.trace("{}.recover({}) failed", getConnection(), flags, e); - throwXAException(e); - } finally { - closeConnection(original); - } - return result; - } - - @Override - public void forget(Xid xid) throws XAException { - ActiveMQConnection original = null; - try { - original = setConnection(newConnection()); - super.forget(xid); - LOG.trace("{}.forget({})", getConnection(), xid); - - } catch (JMSException e) { - LOG.trace("{}.forget({}) failed", getConnection(), xid, e); - throwXAException(e); - } finally { - closeConnection(original); - } - } - - private void throwXAException(JMSException e) throws XAException { - XAException xaException = new XAException(e.getMessage()); - xaException.errorCode = XAException.XAER_RMFAIL; - throw xaException; - } - - private ActiveMQConnection newConnection() throws JMSException { - ActiveMQConnection connection = null; - try { - connection = makeConnection(); - connection.start(); - } catch (JMSException ex) { - if (connection != null) { - try { - connection.close(); - } catch (JMSException ignore) { } - } - throw ex; - } - return connection; - } - - private void closeConnection(ActiveMQConnection original) { - ActiveMQConnection connection = getConnection(); - if (connection != null) { - try { - connection.close(); - } catch (JMSException ignored) {} - } - setConnection(original); - } - }}; - + synchronized ( this ) { + if (xaRecoveryTransactionContext == null) { + LOG.debug("Init XAResource with: " + this.getInfo()); + xaRecoveryTransactionContext = new TransactionContext(makeConnection()); + } + } + return new XAResource[]{ xaRecoveryTransactionContext }; } catch (Exception e) { throw new ResourceException(e); } diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionExecutorThreadCleanUpTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionExecutorThreadCleanUpTest.java index ccb0d6dcb8..684d10101d 100644 --- a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionExecutorThreadCleanUpTest.java +++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionExecutorThreadCleanUpTest.java @@ -96,12 +96,14 @@ public class ActiveMQConnectionExecutorThreadCleanUpTest { // connection error. for (int i=0; i<10; i++) { LOG.debug("Iteration " + i); + ra.start(null); try { XAResource[] resources = ra.getXAResources(null); resources[0].recover(100); } catch (Exception ex) { LOG.error(ex.getMessage()); } + ra.stop(); // allow some small time for thread cleanup to happen Thread.sleep(300); @@ -111,7 +113,6 @@ public class ActiveMQConnectionExecutorThreadCleanUpTest { "\" not cleared up with ActiveMQConnection.", hasActiveMQConnectionExceutorThread()); } - ra.stop(); } diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java index b3e6349d8a..2d11666d1a 100644 --- a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java +++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java @@ -17,7 +17,6 @@ package org.apache.activemq.ra; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -101,6 +100,7 @@ public class ActiveMQConnectionFactoryTest { @Test(timeout = 60000) public void testGetXAResource() throws Exception { ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter(); + ra.start(null); ra.setServerUrl(url); ra.setUserName(user); ra.setPassword(pwd); @@ -114,6 +114,6 @@ public class ActiveMQConnectionFactoryTest { XAResource[] resource2 = ra.getXAResources(null); assertEquals("one resource", 1, resource2.length); assertTrue("isSameRM true", resources[0].isSameRM(resource2[0])); - assertFalse("no tthe same instance", resources[0].equals(resource2[0])); + assertTrue("the same instance", resources[0].equals(resource2[0])); } }