https://issues.apache.org/jira/browse/AMQ-5031 - add connection factory rmIdFromConnectionId attribute to allow resource manager identity to be mapped to the connection id, thus overiding the default broker identity association. usefull is a TM does not end a start with the join flags. We may want to implement join but it is not trivial to determin the existing associated state and clear it

This commit is contained in:
gtully 2014-02-06 15:17:13 +00:00
parent 117f86e10e
commit 221a751d0a
4 changed files with 84 additions and 8 deletions

View File

@ -205,6 +205,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private boolean messagePrioritySupported = true; private boolean messagePrioritySupported = true;
private boolean transactedIndividualAck = false; private boolean transactedIndividualAck = false;
private boolean nonBlockingRedelivery = false; private boolean nonBlockingRedelivery = false;
private boolean rmIdFromConnectionId = false;
private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE; private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
private RejectedExecutionHandler rejectedTaskHandler = null; private RejectedExecutionHandler rejectedTaskHandler = null;
@ -1654,6 +1655,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @throws JMSException * @throws JMSException
*/ */
public String getResourceManagerId() throws JMSException { public String getResourceManagerId() throws JMSException {
if (isRmIdFromConnectionId()) {
return info.getConnectionId().getValue();
}
waitForBrokerInfo(); waitForBrokerInfo();
if (brokerInfo == null) { if (brokerInfo == null) {
throw new JMSException("Connection failed before Broker info was received."); throw new JMSException("Connection failed before Broker info was received.");
@ -2590,6 +2594,14 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.nonBlockingRedelivery = nonBlockingRedelivery; this.nonBlockingRedelivery = nonBlockingRedelivery;
} }
public boolean isRmIdFromConnectionId() {
return rmIdFromConnectionId;
}
public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
this.rmIdFromConnectionId = rmIdFromConnectionId;
}
/** /**
* Removes any TempDestinations that this connection has cached, ignoring * Removes any TempDestinations that this connection has cached, ignoring
* any exceptions generated because the destination is in use as they should * any exceptions generated because the destination is in use as they should

View File

@ -179,6 +179,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private TaskRunnerFactory sessionTaskRunner; private TaskRunnerFactory sessionTaskRunner;
private RejectedExecutionHandler rejectedTaskHandler = null; private RejectedExecutionHandler rejectedTaskHandler = null;
protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection in sub class protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection in sub class
private boolean rmIdFromConnectionId = false;
// ///////////////////////////////////////////// // /////////////////////////////////////////////
// //
@ -221,7 +222,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
} }
} }
/** /*boolean*
* @param brokerURL * @param brokerURL
* @return * @return
* @throws URISyntaxException * @throws URISyntaxException
@ -401,6 +402,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
connection.setSessionTaskRunner(getSessionTaskRunner()); connection.setSessionTaskRunner(getSessionTaskRunner());
connection.setRejectedTaskHandler(getRejectedTaskHandler()); connection.setRejectedTaskHandler(getRejectedTaskHandler());
connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled()); connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled());
connection.setRmIdFromConnectionId(isRmIdFromConnectionId());
if (transportListener != null) { if (transportListener != null) {
connection.addTransportListener(transportListener); connection.addTransportListener(transportListener);
} }
@ -821,6 +823,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
props.setProperty("maxThreadPoolSize", Integer.toString(getMaxThreadPoolSize())); props.setProperty("maxThreadPoolSize", Integer.toString(getMaxThreadPoolSize()));
props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled())); props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled()));
props.setProperty("consumerFailoverRedeliveryWaitPeriod", Long.toString(getConsumerFailoverRedeliveryWaitPeriod())); props.setProperty("consumerFailoverRedeliveryWaitPeriod", Long.toString(getConsumerFailoverRedeliveryWaitPeriod()));
props.setProperty("rmIdFromConnectionId", Boolean.toString(isRmIdFromConnectionId()));
} }
public boolean isUseCompression() { public boolean isUseCompression() {
@ -1205,4 +1208,18 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) { public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval; this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
} }
public boolean isRmIdFromConnectionId() {
return rmIdFromConnectionId;
}
/**
* uses the connection id as the resource identity for XAResource.isSameRM
* ensuring join will only occur on a single connection
*/
public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
this.rmIdFromConnectionId = rmIdFromConnectionId;
}
} }

View File

@ -24,11 +24,14 @@ import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid; import javax.transaction.xa.Xid;
import org.apache.activemq.TransactionContext; import org.apache.activemq.TransactionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Used to provide a LocalTransaction and XAResource to a JMS session. * Used to provide a LocalTransaction and XAResource to a JMS session.
*/ */
public class LocalAndXATransaction implements XAResource, LocalTransaction { public class LocalAndXATransaction implements XAResource, LocalTransaction {
private static final Logger LOG = LoggerFactory.getLogger(LocalAndXATransaction.class);
private final TransactionContext transactionContext; private final TransactionContext transactionContext;
private boolean inManagedTx; private boolean inManagedTx;
@ -86,6 +89,7 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction {
} }
public void end(Xid arg0, int arg1) throws XAException { public void end(Xid arg0, int arg1) throws XAException {
LOG.debug("{} end {} with {}", new Object[]{this, arg0, arg1});
try { try {
transactionContext.end(arg0, arg1); transactionContext.end(arg0, arg1);
} finally { } finally {
@ -106,14 +110,16 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction {
} }
public boolean isSameRM(XAResource xaresource) throws XAException { public boolean isSameRM(XAResource xaresource) throws XAException {
if (xaresource == null) { boolean isSame = false;
return false; if (xaresource != null) {
// Do we have to unwrap?
if (xaresource instanceof LocalAndXATransaction) {
xaresource = ((LocalAndXATransaction)xaresource).transactionContext;
}
isSame = transactionContext.isSameRM(xaresource);
} }
// Do we have to unwrap? LOG.trace("{} isSameRM({}) = {}", new Object[]{this, xaresource, isSame});
if (xaresource instanceof LocalAndXATransaction) { return isSame;
xaresource = ((LocalAndXATransaction)xaresource).transactionContext;
}
return transactionContext.isSameRM(xaresource);
} }
public int prepare(Xid arg0) throws XAException { public int prepare(Xid arg0) throws XAException {
@ -133,6 +139,7 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction {
} }
public void start(Xid arg0, int arg1) throws XAException { public void start(Xid arg0, int arg1) throws XAException {
LOG.trace("{} start {} with {}", new Object[]{this, arg0, arg1});
transactionContext.start(arg0, arg1); transactionContext.start(arg0, arg1);
try { try {
setInManagedTx(true); setInManagedTx(true);

View File

@ -185,6 +185,46 @@ public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport {
} }
} }
public void testIsSameRMOverride() throws URISyntaxException, JMSException, XAException {
XAConnection connection1 = null;
XAConnection connection2 = null;
try {
ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false&jms.rmIdFromConnectionId=true");
connection1 = (XAConnection)cf1.createConnection();
XASession session1 = connection1.createXASession();
XAResource resource1 = session1.getXAResource();
ActiveMQXAConnectionFactory cf2 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
connection2 = (XAConnection)cf2.createConnection();
XASession session2 = connection2.createXASession();
XAResource resource2 = session2.getXAResource();
assertFalse(resource1.isSameRM(resource2));
// ensure identity is preserved
XASession session1a = connection1.createXASession();
assertTrue(resource1.isSameRM(session1a.getXAResource()));
session1.close();
session2.close();
} finally {
if (connection1 != null) {
try {
connection1.close();
} catch (Exception e) {
// ignore
}
}
if (connection2 != null) {
try {
connection2.close();
} catch (Exception e) {
// ignore
}
}
}
}
public void testVanilaTransactionalProduceReceive() throws Exception { public void testVanilaTransactionalProduceReceive() throws Exception {
XAConnection connection1 = null; XAConnection connection1 = null;