From 221a751d0a67e8d1f240d30a2f56c71a31d7627c Mon Sep 17 00:00:00 2001 From: gtully Date: Thu, 6 Feb 2014 15:17:13 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5031 - add connection factory rmIdFromConnectionId attribute to allow resource manager identity to be mapped to the connection id, thus overiding the default broker identity association. usefull is a TM does not end a start with the join flags. We may want to implement join but it is not trivial to determin the existing associated state and clear it --- .../apache/activemq/ActiveMQConnection.java | 12 ++++++ .../activemq/ActiveMQConnectionFactory.java | 19 ++++++++- .../activemq/ra/LocalAndXATransaction.java | 21 ++++++---- .../ActiveMQXAConnectionFactoryTest.java | 40 +++++++++++++++++++ 4 files changed, 84 insertions(+), 8 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index a722e0ba3e..d5f1c1776d 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -205,6 +205,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private boolean messagePrioritySupported = true; private boolean transactedIndividualAck = false; private boolean nonBlockingRedelivery = false; + private boolean rmIdFromConnectionId = false; private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE; private RejectedExecutionHandler rejectedTaskHandler = null; @@ -1654,6 +1655,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @throws JMSException */ public String getResourceManagerId() throws JMSException { + if (isRmIdFromConnectionId()) { + return info.getConnectionId().getValue(); + } waitForBrokerInfo(); if (brokerInfo == null) { throw new JMSException("Connection failed before Broker info was received."); @@ -2590,6 +2594,14 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon this.nonBlockingRedelivery = nonBlockingRedelivery; } + public boolean isRmIdFromConnectionId() { + return rmIdFromConnectionId; + } + + public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) { + this.rmIdFromConnectionId = rmIdFromConnectionId; + } + /** * Removes any TempDestinations that this connection has cached, ignoring * any exceptions generated because the destination is in use as they should diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index 8495d84f99..35d4a69c9f 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -179,6 +179,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne private TaskRunnerFactory sessionTaskRunner; private RejectedExecutionHandler rejectedTaskHandler = null; protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection in sub class + private boolean rmIdFromConnectionId = false; // ///////////////////////////////////////////// // @@ -221,7 +222,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne } } - /** + /*boolean* * @param brokerURL * @return * @throws URISyntaxException @@ -401,6 +402,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne connection.setSessionTaskRunner(getSessionTaskRunner()); connection.setRejectedTaskHandler(getRejectedTaskHandler()); connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled()); + connection.setRmIdFromConnectionId(isRmIdFromConnectionId()); if (transportListener != null) { connection.addTransportListener(transportListener); } @@ -821,6 +823,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne props.setProperty("maxThreadPoolSize", Integer.toString(getMaxThreadPoolSize())); props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled())); props.setProperty("consumerFailoverRedeliveryWaitPeriod", Long.toString(getConsumerFailoverRedeliveryWaitPeriod())); + props.setProperty("rmIdFromConnectionId", Boolean.toString(isRmIdFromConnectionId())); } public boolean isUseCompression() { @@ -1205,4 +1208,18 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) { this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval; } + + + public boolean isRmIdFromConnectionId() { + return rmIdFromConnectionId; + } + + /** + * uses the connection id as the resource identity for XAResource.isSameRM + * ensuring join will only occur on a single connection + */ + public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) { + this.rmIdFromConnectionId = rmIdFromConnectionId; + } + } diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java b/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java index e53cd5c181..0f27393a41 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java @@ -24,11 +24,14 @@ import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import org.apache.activemq.TransactionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Used to provide a LocalTransaction and XAResource to a JMS session. */ public class LocalAndXATransaction implements XAResource, LocalTransaction { + private static final Logger LOG = LoggerFactory.getLogger(LocalAndXATransaction.class); private final TransactionContext transactionContext; private boolean inManagedTx; @@ -86,6 +89,7 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction { } public void end(Xid arg0, int arg1) throws XAException { + LOG.debug("{} end {} with {}", new Object[]{this, arg0, arg1}); try { transactionContext.end(arg0, arg1); } finally { @@ -106,14 +110,16 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction { } public boolean isSameRM(XAResource xaresource) throws XAException { - if (xaresource == null) { - return false; + boolean isSame = false; + if (xaresource != null) { + // Do we have to unwrap? + if (xaresource instanceof LocalAndXATransaction) { + xaresource = ((LocalAndXATransaction)xaresource).transactionContext; + } + isSame = transactionContext.isSameRM(xaresource); } - // Do we have to unwrap? - if (xaresource instanceof LocalAndXATransaction) { - xaresource = ((LocalAndXATransaction)xaresource).transactionContext; - } - return transactionContext.isSameRM(xaresource); + LOG.trace("{} isSameRM({}) = {}", new Object[]{this, xaresource, isSame}); + return isSame; } public int prepare(Xid arg0) throws XAException { @@ -133,6 +139,7 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction { } public void start(Xid arg0, int arg1) throws XAException { + LOG.trace("{} start {} with {}", new Object[]{this, arg0, arg1}); transactionContext.start(arg0, arg1); try { setInManagedTx(true); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java index 7da3035a86..5fd6e5c4b3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java @@ -185,6 +185,46 @@ public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport { } } + public void testIsSameRMOverride() throws URISyntaxException, JMSException, XAException { + + XAConnection connection1 = null; + XAConnection connection2 = null; + try { + ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false&jms.rmIdFromConnectionId=true"); + connection1 = (XAConnection)cf1.createConnection(); + XASession session1 = connection1.createXASession(); + XAResource resource1 = session1.getXAResource(); + + ActiveMQXAConnectionFactory cf2 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false"); + connection2 = (XAConnection)cf2.createConnection(); + XASession session2 = connection2.createXASession(); + XAResource resource2 = session2.getXAResource(); + + assertFalse(resource1.isSameRM(resource2)); + + // ensure identity is preserved + XASession session1a = connection1.createXASession(); + assertTrue(resource1.isSameRM(session1a.getXAResource())); + session1.close(); + session2.close(); + } finally { + if (connection1 != null) { + try { + connection1.close(); + } catch (Exception e) { + // ignore + } + } + if (connection2 != null) { + try { + connection2.close(); + } catch (Exception e) { + // ignore + } + } + } + } + public void testVanilaTransactionalProduceReceive() throws Exception { XAConnection connection1 = null;