From ed4110175534f63e0fac834a63a05440d509752c Mon Sep 17 00:00:00 2001 From: gtully Date: Thu, 11 Jun 2020 19:39:18 +0100 Subject: [PATCH] AMQ-7497 - support reconnect of the single RA xaResource connection --- .../apache/activemq/TransactionContext.java | 4 +- .../activemq/ra/ActiveMQResourceAdapter.java | 121 ++++++++++++++++-- .../ra/ActiveMQConnectionFactoryTest.java | 104 +++++++++++++++ 3 files changed, 218 insertions(+), 11 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java index fbc2fdfd4f..9815a387db 100644 --- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java +++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java @@ -795,7 +795,7 @@ public class TransactionContext implements XAResource { * @param e JMSException to convert * @return XAException wrapping original exception or its message */ - private XAException toXAException(JMSException e) { + public static XAException toXAException(JMSException e) { if (e.getCause() != null && e.getCause() instanceof XAException) { XAException original = (XAException)e.getCause(); XAException xae = new XAException(original.getMessage()); @@ -818,7 +818,7 @@ public class TransactionContext implements XAResource { return xae; } - private int parseFromMessageOr(String message, int fallbackCode) { + private static int parseFromMessageOr(String message, int fallbackCode) { final String marker = "xaErrorCode:"; final int index = message.lastIndexOf(marker); if (index > -1) { diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java index 4c3f5c1353..e4edb065b3 100644 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java @@ -43,6 +43,8 @@ import org.apache.activemq.util.ServiceSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.activemq.TransactionContext.toXAException; + /** * Knows how to connect to one ActiveMQ server. It can then activate endpoints * and deliver messages to those end points using the connection configure in @@ -62,7 +64,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement private transient BrokerService broker; private transient Thread brokerStartThread; private ActiveMQConnectionFactory connectionFactory; - private transient TransactionContext xaRecoveryTransactionContext; + private transient ReconnectingXAResource reconnectingXaResource; /** * @@ -174,15 +176,13 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement ServiceSupport.dispose(broker); broker = null; } - if (xaRecoveryTransactionContext != null) { - try { - xaRecoveryTransactionContext.getConnection().close(); - } catch (Throwable ignored) {} + if (reconnectingXaResource != null) { + reconnectingXaResource.stop(); } } this.bootstrapContext = null; - this.xaRecoveryTransactionContext = null; + this.reconnectingXaResource = null; } /** @@ -269,17 +269,120 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement } try { synchronized ( this ) { - if (xaRecoveryTransactionContext == null) { + if (reconnectingXaResource == null) { LOG.debug("Init XAResource with: " + this.getInfo()); - xaRecoveryTransactionContext = new TransactionContext(makeConnection()); + reconnectingXaResource = new ReconnectingXAResource(new TransactionContext(makeConnection())); } } - return new XAResource[]{ xaRecoveryTransactionContext }; + + return new XAResource[]{reconnectingXaResource}; + } catch (Exception e) { throw new ResourceException(e); } } + private void ensureConnection(TransactionContext xaRecoveryTransactionContext) throws XAException { + final ActiveMQConnection existingConnection = xaRecoveryTransactionContext.getConnection(); + if (existingConnection == null || existingConnection.isTransportFailed()) { + try { + LOG.debug("reconnect XAResource with: " + this.getInfo(), existingConnection == null ? "" : existingConnection.getFirstFailureError()); + xaRecoveryTransactionContext.setConnection(makeConnection()); + } catch (JMSException e) { + throw toXAException(e); + } finally { + if (existingConnection != null) { + try { + existingConnection.close(); + } catch (Exception ignored) { + } + } + } + } + } + + private class ReconnectingXAResource implements XAResource { + protected TransactionContext delegate; + + ReconnectingXAResource(TransactionContext delegate) { + this.delegate = delegate; + } + + @Override + public void commit(Xid xid, boolean b) throws XAException { + ensureConnection(delegate); + delegate.commit(xid, b); + } + + @Override + public void end(Xid xid, int i) throws XAException { + ensureConnection(delegate); + delegate.end(xid, i); + } + + @Override + public void forget(Xid xid) throws XAException { + ensureConnection(delegate); + delegate.forget(xid); + } + + @Override + public int getTransactionTimeout() throws XAException { + ensureConnection(delegate); + return delegate.getTransactionTimeout(); + } + + @Override + public boolean isSameRM(XAResource xaResource) throws XAException { + if (this == xaResource) { + return true; + } + if (!(xaResource instanceof ReconnectingXAResource)) { + return false; + } + + ensureConnection(delegate); + return delegate.isSameRM(((ReconnectingXAResource)xaResource).delegate); + } + + @Override + public int prepare(Xid xid) throws XAException { + ensureConnection(delegate); + return delegate.prepare(xid); + } + + @Override + public Xid[] recover(int i) throws XAException { + ensureConnection(delegate); + return delegate.recover(i); + } + + @Override + public void rollback(Xid xid) throws XAException { + ensureConnection(delegate); + delegate.rollback(xid); + + } + + @Override + public boolean setTransactionTimeout(int i) throws XAException { + ensureConnection(delegate); + return delegate.setTransactionTimeout(i); + } + + @Override + public void start(Xid xid, int i) throws XAException { + ensureConnection(delegate); + delegate.start(xid, i); + } + + public void stop() { + try { + delegate.getConnection().close(); + } catch (Throwable ignored) {} + } + }; + // /////////////////////////////////////////////////////////////////////// // // Java Bean getters and setters for this ResourceAdapter class. diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java index 2d11666d1a..034e3d7752 100644 --- a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java +++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.ra; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -32,6 +33,9 @@ import javax.transaction.xa.XAResource; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQTopicSubscriber; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.util.Wait; import org.junit.Before; import org.junit.Test; @@ -115,5 +119,105 @@ public class ActiveMQConnectionFactoryTest { assertEquals("one resource", 1, resource2.length); assertTrue("isSameRM true", resources[0].isSameRM(resource2[0])); assertTrue("the same instance", resources[0].equals(resource2[0])); + + ra.stop(); } + + + @Test + public void testXAResourceReconnect() throws Exception { + + BrokerService brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.addConnector("tcp://localhost:0"); + brokerService.start(); + + try { + final TransportConnector transportConnector = brokerService.getTransportConnectors().get(0); + + String failoverUrl = String.format("failover:(%s)?maxReconnectAttempts=1", transportConnector.getConnectUri()); + + ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter(); + ra.start(null); + ra.setServerUrl(failoverUrl); + ra.setUserName(user); + ra.setPassword(pwd); + + XAResource[] resources = ra.getXAResources(null); + assertEquals("one resource", 1, resources.length); + + assertEquals("no pending transactions", 0, resources[0].recover(100).length); + + transportConnector.stop(); + assertTrue("no connections", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return transportConnector.getConnections().isEmpty(); + } + })); + + try { + resources[0].recover(100); + fail("Expect error on broken connection"); + } catch (Exception expected) { + } + + transportConnector.start(); + + // should recover ok + assertEquals("no pending transactions", 0, resources[0].recover(100).length); + + } finally { + brokerService.stop(); + } + } + + @Test + public void testXAResourceFailoverFailBack() throws Exception { + + BrokerService brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.addConnector("tcp://localhost:0"); + brokerService.addConnector("tcp://localhost:0"); + brokerService.start(); + + try { + + final TransportConnector primary = brokerService.getTransportConnectors().get(0); + final TransportConnector secondary = brokerService.getTransportConnectors().get(1); + + String failoverUrl = String.format("failover:(%s,%s)?maxReconnectAttempts=1&randomize=false", primary.getConnectUri(), secondary.getConnectUri()); + + ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter(); + ra.start(null); + ra.setServerUrl(failoverUrl); + ra.setUserName(user); + ra.setPassword(pwd); + + XAResource[] resources = ra.getXAResources(null); + assertEquals("one resource", 1, resources.length); + + assertEquals("no pending transactions", 0, resources[0].recover(100).length); + + primary.stop(); + + // should recover ok + assertEquals("no pending transactions", 0, resources[0].recover(100).length); + + primary.start(); + + // should be ok + assertEquals("no pending transactions", 0, resources[0].recover(100).length); + + secondary.stop(); + + // should recover ok + assertEquals("no pending transactions", 0, resources[0].recover(100).length); + + } finally { + brokerService.stop(); + } + + } + }