AMQ-7497 - support reconnect of the single RA xaResource connection

This commit is contained in:
gtully 2020-06-11 19:39:18 +01:00
parent d539ff77b9
commit ed41101755
3 changed files with 218 additions and 11 deletions

View File

@ -795,7 +795,7 @@ public class TransactionContext implements XAResource {
* @param e JMSException to convert
* @return XAException wrapping original exception or its message
*/
private XAException toXAException(JMSException e) {
public static XAException toXAException(JMSException e) {
if (e.getCause() != null && e.getCause() instanceof XAException) {
XAException original = (XAException)e.getCause();
XAException xae = new XAException(original.getMessage());
@ -818,7 +818,7 @@ public class TransactionContext implements XAResource {
return xae;
}
private int parseFromMessageOr(String message, int fallbackCode) {
private static int parseFromMessageOr(String message, int fallbackCode) {
final String marker = "xaErrorCode:";
final int index = message.lastIndexOf(marker);
if (index > -1) {

View File

@ -43,6 +43,8 @@ import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.activemq.TransactionContext.toXAException;
/**
* Knows how to connect to one ActiveMQ server. It can then activate endpoints
* and deliver messages to those end points using the connection configure in
@ -62,7 +64,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
private transient BrokerService broker;
private transient Thread brokerStartThread;
private ActiveMQConnectionFactory connectionFactory;
private transient TransactionContext xaRecoveryTransactionContext;
private transient ReconnectingXAResource reconnectingXaResource;
/**
*
@ -174,15 +176,13 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
ServiceSupport.dispose(broker);
broker = null;
}
if (xaRecoveryTransactionContext != null) {
try {
xaRecoveryTransactionContext.getConnection().close();
} catch (Throwable ignored) {}
if (reconnectingXaResource != null) {
reconnectingXaResource.stop();
}
}
this.bootstrapContext = null;
this.xaRecoveryTransactionContext = null;
this.reconnectingXaResource = null;
}
/**
@ -269,17 +269,120 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
}
try {
synchronized ( this ) {
if (xaRecoveryTransactionContext == null) {
if (reconnectingXaResource == null) {
LOG.debug("Init XAResource with: " + this.getInfo());
xaRecoveryTransactionContext = new TransactionContext(makeConnection());
reconnectingXaResource = new ReconnectingXAResource(new TransactionContext(makeConnection()));
}
}
return new XAResource[]{ xaRecoveryTransactionContext };
return new XAResource[]{reconnectingXaResource};
} catch (Exception e) {
throw new ResourceException(e);
}
}
private void ensureConnection(TransactionContext xaRecoveryTransactionContext) throws XAException {
final ActiveMQConnection existingConnection = xaRecoveryTransactionContext.getConnection();
if (existingConnection == null || existingConnection.isTransportFailed()) {
try {
LOG.debug("reconnect XAResource with: " + this.getInfo(), existingConnection == null ? "" : existingConnection.getFirstFailureError());
xaRecoveryTransactionContext.setConnection(makeConnection());
} catch (JMSException e) {
throw toXAException(e);
} finally {
if (existingConnection != null) {
try {
existingConnection.close();
} catch (Exception ignored) {
}
}
}
}
}
private class ReconnectingXAResource implements XAResource {
protected TransactionContext delegate;
ReconnectingXAResource(TransactionContext delegate) {
this.delegate = delegate;
}
@Override
public void commit(Xid xid, boolean b) throws XAException {
ensureConnection(delegate);
delegate.commit(xid, b);
}
@Override
public void end(Xid xid, int i) throws XAException {
ensureConnection(delegate);
delegate.end(xid, i);
}
@Override
public void forget(Xid xid) throws XAException {
ensureConnection(delegate);
delegate.forget(xid);
}
@Override
public int getTransactionTimeout() throws XAException {
ensureConnection(delegate);
return delegate.getTransactionTimeout();
}
@Override
public boolean isSameRM(XAResource xaResource) throws XAException {
if (this == xaResource) {
return true;
}
if (!(xaResource instanceof ReconnectingXAResource)) {
return false;
}
ensureConnection(delegate);
return delegate.isSameRM(((ReconnectingXAResource)xaResource).delegate);
}
@Override
public int prepare(Xid xid) throws XAException {
ensureConnection(delegate);
return delegate.prepare(xid);
}
@Override
public Xid[] recover(int i) throws XAException {
ensureConnection(delegate);
return delegate.recover(i);
}
@Override
public void rollback(Xid xid) throws XAException {
ensureConnection(delegate);
delegate.rollback(xid);
}
@Override
public boolean setTransactionTimeout(int i) throws XAException {
ensureConnection(delegate);
return delegate.setTransactionTimeout(i);
}
@Override
public void start(Xid xid, int i) throws XAException {
ensureConnection(delegate);
delegate.start(xid, i);
}
public void stop() {
try {
delegate.getConnection().close();
} catch (Throwable ignored) {}
}
};
// ///////////////////////////////////////////////////////////////////////
//
// Java Bean getters and setters for this ResourceAdapter class.

View File

@ -19,6 +19,7 @@ package org.apache.activemq.ra;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -32,6 +33,9 @@ import javax.transaction.xa.XAResource;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQTopicSubscriber;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.util.Wait;
import org.junit.Before;
import org.junit.Test;
@ -115,5 +119,105 @@ public class ActiveMQConnectionFactoryTest {
assertEquals("one resource", 1, resource2.length);
assertTrue("isSameRM true", resources[0].isSameRM(resource2[0]));
assertTrue("the same instance", resources[0].equals(resource2[0]));
ra.stop();
}
@Test
public void testXAResourceReconnect() throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.addConnector("tcp://localhost:0");
brokerService.start();
try {
final TransportConnector transportConnector = brokerService.getTransportConnectors().get(0);
String failoverUrl = String.format("failover:(%s)?maxReconnectAttempts=1", transportConnector.getConnectUri());
ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
ra.start(null);
ra.setServerUrl(failoverUrl);
ra.setUserName(user);
ra.setPassword(pwd);
XAResource[] resources = ra.getXAResources(null);
assertEquals("one resource", 1, resources.length);
assertEquals("no pending transactions", 0, resources[0].recover(100).length);
transportConnector.stop();
assertTrue("no connections", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return transportConnector.getConnections().isEmpty();
}
}));
try {
resources[0].recover(100);
fail("Expect error on broken connection");
} catch (Exception expected) {
}
transportConnector.start();
// should recover ok
assertEquals("no pending transactions", 0, resources[0].recover(100).length);
} finally {
brokerService.stop();
}
}
@Test
public void testXAResourceFailoverFailBack() throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.addConnector("tcp://localhost:0");
brokerService.addConnector("tcp://localhost:0");
brokerService.start();
try {
final TransportConnector primary = brokerService.getTransportConnectors().get(0);
final TransportConnector secondary = brokerService.getTransportConnectors().get(1);
String failoverUrl = String.format("failover:(%s,%s)?maxReconnectAttempts=1&randomize=false", primary.getConnectUri(), secondary.getConnectUri());
ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
ra.start(null);
ra.setServerUrl(failoverUrl);
ra.setUserName(user);
ra.setPassword(pwd);
XAResource[] resources = ra.getXAResources(null);
assertEquals("one resource", 1, resources.length);
assertEquals("no pending transactions", 0, resources[0].recover(100).length);
primary.stop();
// should recover ok
assertEquals("no pending transactions", 0, resources[0].recover(100).length);
primary.start();
// should be ok
assertEquals("no pending transactions", 0, resources[0].recover(100).length);
secondary.stop();
// should recover ok
assertEquals("no pending transactions", 0, resources[0].recover(100).length);
} finally {
brokerService.stop();
}
}
}