mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-02-28 14:29:31 +00:00
ARTEMIS-3707 remove use of TransactionManager
The TransactionManager interface defines the methods that allow an *application server* to manage transaction boundaries. However, the TransactionSynchronizationRegistry is intended for use by system level application server components such as persistence managers, *resource adapters*, as well as EJB and Web application components.
This commit is contained in:
parent
51c1504b05
commit
59a7257ce5
@ -133,7 +133,7 @@ public class ActiveMQRAConnectionFactoryImpl implements ActiveMQRAConnectionFact
|
||||
public QueueConnection createQueueConnection() throws JMSException {
|
||||
logger.trace("createQueueConnection()");
|
||||
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTM(), ActiveMQRAConnectionFactory.QUEUE_CONNECTION);
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTSR(), ActiveMQRAConnectionFactory.QUEUE_CONNECTION);
|
||||
|
||||
logger.trace("Created queue connection: {}", s);
|
||||
|
||||
@ -152,7 +152,7 @@ public class ActiveMQRAConnectionFactoryImpl implements ActiveMQRAConnectionFact
|
||||
public QueueConnection createQueueConnection(final String userName, final String password) throws JMSException {
|
||||
logger.trace("createQueueConnection({}, ****)", userName);
|
||||
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTM(), ActiveMQRAConnectionFactory.QUEUE_CONNECTION);
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTSR(), ActiveMQRAConnectionFactory.QUEUE_CONNECTION);
|
||||
s.setUserName(userName);
|
||||
s.setPassword(password);
|
||||
|
||||
@ -173,7 +173,7 @@ public class ActiveMQRAConnectionFactoryImpl implements ActiveMQRAConnectionFact
|
||||
public TopicConnection createTopicConnection() throws JMSException {
|
||||
logger.trace("createTopicConnection()");
|
||||
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTM(), ActiveMQRAConnectionFactory.TOPIC_CONNECTION);
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTSR(), ActiveMQRAConnectionFactory.TOPIC_CONNECTION);
|
||||
|
||||
logger.trace("Created topic connection: {}", s);
|
||||
|
||||
@ -192,7 +192,7 @@ public class ActiveMQRAConnectionFactoryImpl implements ActiveMQRAConnectionFact
|
||||
public TopicConnection createTopicConnection(final String userName, final String password) throws JMSException {
|
||||
logger.trace("createTopicConnection({}, ****)", userName);
|
||||
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTM(), ActiveMQRAConnectionFactory.TOPIC_CONNECTION);
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTSR(), ActiveMQRAConnectionFactory.TOPIC_CONNECTION);
|
||||
s.setUserName(userName);
|
||||
s.setPassword(password);
|
||||
validateUser(s);
|
||||
@ -212,7 +212,7 @@ public class ActiveMQRAConnectionFactoryImpl implements ActiveMQRAConnectionFact
|
||||
public Connection createConnection() throws JMSException {
|
||||
logger.trace("createConnection()");
|
||||
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTM(), ActiveMQRAConnectionFactory.CONNECTION);
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTSR(), ActiveMQRAConnectionFactory.CONNECTION);
|
||||
|
||||
logger.trace("Created connection: {}", s);
|
||||
|
||||
@ -231,7 +231,7 @@ public class ActiveMQRAConnectionFactoryImpl implements ActiveMQRAConnectionFact
|
||||
public Connection createConnection(final String userName, final String password) throws JMSException {
|
||||
logger.trace("createConnection({}, ****)", userName);
|
||||
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTM(), ActiveMQRAConnectionFactory.CONNECTION);
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTSR(), ActiveMQRAConnectionFactory.CONNECTION);
|
||||
s.setUserName(userName);
|
||||
s.setPassword(password);
|
||||
|
||||
@ -252,7 +252,7 @@ public class ActiveMQRAConnectionFactoryImpl implements ActiveMQRAConnectionFact
|
||||
public XAQueueConnection createXAQueueConnection() throws JMSException {
|
||||
logger.trace("createXAQueueConnection()");
|
||||
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTM(), ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION);
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTSR(), ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION);
|
||||
|
||||
logger.trace("Created queue connection: {}", s);
|
||||
|
||||
@ -271,7 +271,7 @@ public class ActiveMQRAConnectionFactoryImpl implements ActiveMQRAConnectionFact
|
||||
public XAQueueConnection createXAQueueConnection(final String userName, final String password) throws JMSException {
|
||||
logger.trace("createXAQueueConnection({}, ****)", userName);
|
||||
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTM(), ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION);
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTSR(), ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION);
|
||||
s.setUserName(userName);
|
||||
s.setPassword(password);
|
||||
validateUser(s);
|
||||
@ -291,7 +291,7 @@ public class ActiveMQRAConnectionFactoryImpl implements ActiveMQRAConnectionFact
|
||||
public XATopicConnection createXATopicConnection() throws JMSException {
|
||||
logger.trace("createXATopicConnection()");
|
||||
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTM(), ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION);
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTSR(), ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION);
|
||||
|
||||
logger.trace("Created topic connection: {}", s);
|
||||
|
||||
@ -310,7 +310,7 @@ public class ActiveMQRAConnectionFactoryImpl implements ActiveMQRAConnectionFact
|
||||
public XATopicConnection createXATopicConnection(final String userName, final String password) throws JMSException {
|
||||
logger.trace("createXATopicConnection({}, ****)", userName);
|
||||
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTM(), ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION);
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTSR(), ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION);
|
||||
s.setUserName(userName);
|
||||
s.setPassword(password);
|
||||
validateUser(s);
|
||||
@ -330,7 +330,7 @@ public class ActiveMQRAConnectionFactoryImpl implements ActiveMQRAConnectionFact
|
||||
public XAConnection createXAConnection() throws JMSException {
|
||||
logger.trace("createXAConnection()");
|
||||
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTM(), ActiveMQRAConnectionFactory.XA_CONNECTION);
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTSR(), ActiveMQRAConnectionFactory.XA_CONNECTION);
|
||||
|
||||
logger.trace("Created connection: {}", s);
|
||||
|
||||
@ -349,7 +349,7 @@ public class ActiveMQRAConnectionFactoryImpl implements ActiveMQRAConnectionFact
|
||||
public XAConnection createXAConnection(final String userName, final String password) throws JMSException {
|
||||
logger.trace("createXAConnection({}, ****)", userName);
|
||||
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTM(), ActiveMQRAConnectionFactory.XA_CONNECTION);
|
||||
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTSR(), ActiveMQRAConnectionFactory.XA_CONNECTION);
|
||||
s.setUserName(userName);
|
||||
s.setPassword(password);
|
||||
validateUser(s);
|
||||
@ -372,7 +372,7 @@ public class ActiveMQRAConnectionFactoryImpl implements ActiveMQRAConnectionFact
|
||||
@Override
|
||||
public JMSContext createContext(String userName, String password, int sessionMode) {
|
||||
@SuppressWarnings("resource")
|
||||
ActiveMQRASessionFactoryImpl conn = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTM(), ActiveMQRAConnectionFactory.CONNECTION);
|
||||
ActiveMQRASessionFactoryImpl conn = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTSR(), ActiveMQRAConnectionFactory.CONNECTION);
|
||||
conn.setUserName(userName);
|
||||
conn.setPassword(password);
|
||||
try {
|
||||
@ -401,7 +401,7 @@ public class ActiveMQRAConnectionFactoryImpl implements ActiveMQRAConnectionFact
|
||||
|
||||
@Override
|
||||
public XAJMSContext createXAContext(String userName, String password) {
|
||||
ActiveMQRASessionFactoryImpl conn = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTM(), ActiveMQRAConnectionFactory.XA_CONNECTION);
|
||||
ActiveMQRASessionFactoryImpl conn = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTSR(), ActiveMQRAConnectionFactory.XA_CONNECTION);
|
||||
conn.setUserName(userName);
|
||||
conn.setPassword(password);
|
||||
try {
|
||||
|
@ -16,6 +16,18 @@
|
||||
*/
|
||||
package org.apache.activemq.artemis.ra;
|
||||
|
||||
import java.io.PrintWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import javax.jms.ExceptionListener;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.ResourceAllocationException;
|
||||
@ -32,21 +44,8 @@ import javax.resource.spi.ManagedConnectionMetaData;
|
||||
import javax.resource.spi.SecurityException;
|
||||
import javax.security.auth.Subject;
|
||||
import javax.transaction.Status;
|
||||
import javax.transaction.SystemException;
|
||||
import javax.transaction.Transaction;
|
||||
import javax.transaction.TransactionManager;
|
||||
import javax.transaction.TransactionSynchronizationRegistry;
|
||||
import javax.transaction.xa.XAResource;
|
||||
import java.io.PrintWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
|
||||
@ -124,7 +123,7 @@ public final class ActiveMQRAManagedConnection implements ManagedConnection, Exc
|
||||
|
||||
private XAResource xaResource;
|
||||
|
||||
private final TransactionManager tm;
|
||||
private final TransactionSynchronizationRegistry tsr;
|
||||
|
||||
private boolean inManagedTx;
|
||||
|
||||
@ -147,7 +146,7 @@ public final class ActiveMQRAManagedConnection implements ManagedConnection, Exc
|
||||
|
||||
this.mcf = mcf;
|
||||
this.cri = cri;
|
||||
this.tm = ra.getTM();
|
||||
this.tsr = ra.getTSR();
|
||||
this.ra = ra;
|
||||
this.userName = userName;
|
||||
this.password = password;
|
||||
@ -338,20 +337,11 @@ public final class ActiveMQRAManagedConnection implements ManagedConnection, Exc
|
||||
|
||||
public void checkTransactionActive() throws JMSException {
|
||||
// don't bother looking at the transaction if there's an active XID
|
||||
if (!inManagedTx && tm != null) {
|
||||
try {
|
||||
Transaction tx = tm.getTransaction();
|
||||
if (tx != null) {
|
||||
int status = tx.getStatus();
|
||||
if (!inManagedTx && tsr != null) {
|
||||
int status = tsr.getTransactionStatus();
|
||||
// Only allow states that will actually succeed
|
||||
if (status != Status.STATUS_ACTIVE && status != Status.STATUS_PREPARING && status != Status.STATUS_PREPARED && status != Status.STATUS_COMMITTING) {
|
||||
throw new javax.jms.IllegalStateException("Transaction " + tx + " not active");
|
||||
}
|
||||
}
|
||||
} catch (SystemException e) {
|
||||
JMSException jmsE = new javax.jms.IllegalStateException("Unexpected exception on the Transaction ManagerTransaction");
|
||||
jmsE.initCause(e);
|
||||
throw jmsE;
|
||||
throw new javax.jms.IllegalStateException("Transaction " + tsr.getTransactionKey() + " not active");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,10 @@
|
||||
*/
|
||||
package org.apache.activemq.artemis.ra;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.jms.ConnectionConsumer;
|
||||
import javax.jms.ConnectionMetaData;
|
||||
import javax.jms.Destination;
|
||||
@ -38,12 +42,8 @@ import javax.jms.XATopicSession;
|
||||
import javax.naming.Reference;
|
||||
import javax.resource.Referenceable;
|
||||
import javax.resource.spi.ConnectionManager;
|
||||
import javax.transaction.SystemException;
|
||||
import javax.transaction.Transaction;
|
||||
import javax.transaction.TransactionManager;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
import javax.transaction.Status;
|
||||
import javax.transaction.TransactionSynchronizationRegistry;
|
||||
|
||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionForContext;
|
||||
@ -99,7 +99,7 @@ public final class ActiveMQRASessionFactoryImpl extends ActiveMQConnectionForCon
|
||||
* The managed connection factory
|
||||
*/
|
||||
private final ActiveMQRAManagedConnectionFactory mcf;
|
||||
private TransactionManager tm;
|
||||
private final TransactionSynchronizationRegistry tsr;
|
||||
|
||||
/**
|
||||
* The connection manager
|
||||
@ -132,11 +132,11 @@ public final class ActiveMQRASessionFactoryImpl extends ActiveMQConnectionForCon
|
||||
*/
|
||||
public ActiveMQRASessionFactoryImpl(final ActiveMQRAManagedConnectionFactory mcf,
|
||||
final ConnectionManager cm,
|
||||
final TransactionManager tm,
|
||||
final TransactionSynchronizationRegistry tsr,
|
||||
final int type) {
|
||||
this.mcf = mcf;
|
||||
|
||||
this.tm = tm;
|
||||
this.tsr = tsr;
|
||||
|
||||
if (cm == null) {
|
||||
this.cm = new ActiveMQRAConnectionManager();
|
||||
@ -847,14 +847,8 @@ public final class ActiveMQRASessionFactoryImpl extends ActiveMQConnectionForCon
|
||||
|
||||
private boolean inJtaTransaction() {
|
||||
boolean inJtaTx = false;
|
||||
if (tm != null) {
|
||||
Transaction tx = null;
|
||||
try {
|
||||
tx = tm.getTransaction();
|
||||
} catch (SystemException e) {
|
||||
//assume false
|
||||
}
|
||||
inJtaTx = tx != null;
|
||||
if (tsr != null) {
|
||||
inJtaTx = tsr.getTransactionStatus() != Status.STATUS_NO_TRANSACTION;
|
||||
}
|
||||
return inJtaTx;
|
||||
}
|
||||
|
@ -24,7 +24,7 @@ import javax.resource.spi.ResourceAdapter;
|
||||
import javax.resource.spi.ResourceAdapterInternalException;
|
||||
import javax.resource.spi.endpoint.MessageEndpointFactory;
|
||||
import javax.resource.spi.work.WorkManager;
|
||||
import javax.transaction.TransactionManager;
|
||||
import javax.transaction.TransactionSynchronizationRegistry;
|
||||
import javax.transaction.xa.XAResource;
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
@ -57,7 +57,6 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation;
|
||||
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
|
||||
import org.apache.activemq.artemis.ra.recovery.RecoveryManager;
|
||||
import org.apache.activemq.artemis.service.extensions.ServiceUtils;
|
||||
import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -112,7 +111,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
|
||||
|
||||
private ActiveMQConnectionFactory recoveryActiveMQConnectionFactory;
|
||||
|
||||
private TransactionManager tm;
|
||||
private TransactionSynchronizationRegistry tsr;
|
||||
|
||||
private String unparsedJndiParams;
|
||||
|
||||
@ -147,8 +146,8 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
|
||||
recoveryManager = new RecoveryManager();
|
||||
}
|
||||
|
||||
public TransactionManager getTM() {
|
||||
return tm;
|
||||
public TransactionSynchronizationRegistry getTSR() {
|
||||
return tsr;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -233,7 +232,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
|
||||
public void start(final BootstrapContext ctx) throws ResourceAdapterInternalException {
|
||||
logger.trace("start({})", ctx);
|
||||
|
||||
tm = ServiceUtils.getTransactionManager();
|
||||
tsr = ctx.getTransactionSynchronizationRegistry();
|
||||
|
||||
recoveryManager.start(useAutoRecovery);
|
||||
|
||||
|
@ -16,18 +16,6 @@
|
||||
*/
|
||||
package org.apache.activemq.artemis.ra.inflow;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Topic;
|
||||
import javax.naming.Context;
|
||||
import javax.naming.InitialContext;
|
||||
import javax.resource.ResourceException;
|
||||
import javax.resource.spi.endpoint.MessageEndpointFactory;
|
||||
import javax.resource.spi.work.Work;
|
||||
import javax.resource.spi.work.WorkManager;
|
||||
import javax.transaction.xa.XAResource;
|
||||
import java.lang.reflect.Method;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
@ -38,12 +26,25 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Topic;
|
||||
import javax.naming.Context;
|
||||
import javax.naming.InitialContext;
|
||||
import javax.resource.ResourceException;
|
||||
import javax.resource.spi.endpoint.MessageEndpointFactory;
|
||||
import javax.resource.spi.work.Work;
|
||||
import javax.resource.spi.work.WorkException;
|
||||
import javax.resource.spi.work.WorkManager;
|
||||
import javax.transaction.xa.XAResource;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
@ -309,7 +310,7 @@ public class ActiveMQActivation {
|
||||
cf = factory.getServerLocator().createSessionFactory();
|
||||
}
|
||||
session = setupSession(cf);
|
||||
ActiveMQMessageHandler handler = new ActiveMQMessageHandler(factory, this, ra.getTM(), (ClientSessionInternal) session, cf, i);
|
||||
ActiveMQMessageHandler handler = new ActiveMQMessageHandler(factory, this, ra.getTSR(), (ClientSessionInternal) session, cf, i);
|
||||
handler.setup();
|
||||
handlers.add(handler);
|
||||
} catch (Exception e) {
|
||||
|
@ -16,18 +16,17 @@
|
||||
*/
|
||||
package org.apache.activemq.artemis.ra.inflow;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import javax.jms.MessageListener;
|
||||
import javax.resource.ResourceException;
|
||||
import javax.resource.spi.endpoint.MessageEndpoint;
|
||||
import javax.resource.spi.endpoint.MessageEndpointFactory;
|
||||
import javax.transaction.Status;
|
||||
import javax.transaction.SystemException;
|
||||
import javax.transaction.Transaction;
|
||||
import javax.transaction.TransactionManager;
|
||||
import javax.transaction.TransactionSynchronizationRegistry;
|
||||
import javax.transaction.xa.XAResource;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||
@ -88,7 +87,7 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
|
||||
|
||||
private final int sessionNr;
|
||||
|
||||
private final TransactionManager tm;
|
||||
private final TransactionSynchronizationRegistry tsr;
|
||||
|
||||
private ClientSessionFactory cf;
|
||||
|
||||
@ -98,7 +97,7 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
|
||||
|
||||
public ActiveMQMessageHandler(final ConnectionFactoryOptions options,
|
||||
final ActiveMQActivation activation,
|
||||
final TransactionManager tm,
|
||||
final TransactionSynchronizationRegistry tsr,
|
||||
final ClientSessionInternal session,
|
||||
final ClientSessionFactory cf,
|
||||
final int sessionNr) {
|
||||
@ -107,7 +106,7 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
|
||||
this.session = session;
|
||||
this.cf = cf;
|
||||
this.sessionNr = sessionNr;
|
||||
this.tm = tm;
|
||||
this.tsr = tsr;
|
||||
}
|
||||
|
||||
public void setup() throws Exception {
|
||||
@ -299,10 +298,6 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
|
||||
boolean beforeDelivery = false;
|
||||
|
||||
try {
|
||||
if (activation.getActivationSpec().getTransactionTimeout() > 0 && tm != null) {
|
||||
tm.setTransactionTimeout(activation.getActivationSpec().getTransactionTimeout());
|
||||
}
|
||||
|
||||
logger.trace("ActiveMQMessageHandler::calling beforeDelivery on message {}", message);
|
||||
|
||||
endpoint.beforeDelivery(ActiveMQActivation.ONMESSAGE);
|
||||
@ -342,27 +337,16 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
|
||||
ActiveMQRALogger.LOGGER.errorDeliveringMessage(e);
|
||||
// we need to call before/afterDelivery as a pair
|
||||
int status = Status.STATUS_NO_TRANSACTION;
|
||||
if (useXA && tm != null) {
|
||||
try {
|
||||
status = tm.getStatus();
|
||||
} catch (SystemException e1) {
|
||||
//not sure we can do much more here
|
||||
}
|
||||
if (useXA && tsr != null) {
|
||||
status = tsr.getTransactionStatus();
|
||||
}
|
||||
if (beforeDelivery || status != Status.STATUS_NO_TRANSACTION) {
|
||||
if (useXA && tm != null) {
|
||||
if (useXA && tsr != null) {
|
||||
// This is the job for the container,
|
||||
// however if the container throws an exception because of some other errors,
|
||||
// there are situations where the container is not setting the rollback only
|
||||
// this is to avoid a scenario where afterDelivery would kick in
|
||||
try {
|
||||
Transaction tx = tm.getTransaction();
|
||||
if (tx != null) {
|
||||
tx.setRollbackOnly();
|
||||
}
|
||||
} catch (Exception e1) {
|
||||
ActiveMQRALogger.LOGGER.unableToClearTheTransaction(e1);
|
||||
}
|
||||
tsr.setRollbackOnly();
|
||||
}
|
||||
|
||||
MessageEndpoint endToUse = endpoint;
|
||||
|
Loading…
x
Reference in New Issue
Block a user