This closes #2475
This commit is contained in:
commit
5aeee1fcea
|
@ -20,6 +20,8 @@ import javax.jms.MessageListener;
|
||||||
import javax.resource.ResourceException;
|
import javax.resource.ResourceException;
|
||||||
import javax.resource.spi.endpoint.MessageEndpoint;
|
import javax.resource.spi.endpoint.MessageEndpoint;
|
||||||
import javax.resource.spi.endpoint.MessageEndpointFactory;
|
import javax.resource.spi.endpoint.MessageEndpointFactory;
|
||||||
|
import javax.transaction.Status;
|
||||||
|
import javax.transaction.SystemException;
|
||||||
import javax.transaction.Transaction;
|
import javax.transaction.Transaction;
|
||||||
import javax.transaction.TransactionManager;
|
import javax.transaction.TransactionManager;
|
||||||
import javax.transaction.xa.XAResource;
|
import javax.transaction.xa.XAResource;
|
||||||
|
@ -344,7 +346,15 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
ActiveMQRALogger.LOGGER.errorDeliveringMessage(e);
|
ActiveMQRALogger.LOGGER.errorDeliveringMessage(e);
|
||||||
// we need to call before/afterDelivery as a pair
|
// we need to call before/afterDelivery as a pair
|
||||||
if (beforeDelivery) {
|
int status = Status.STATUS_NO_TRANSACTION;
|
||||||
|
if (useXA && tm != null) {
|
||||||
|
try {
|
||||||
|
status = tm.getStatus();
|
||||||
|
} catch (SystemException e1) {
|
||||||
|
//not sure we can do much more here
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (beforeDelivery || status != Status.STATUS_NO_TRANSACTION) {
|
||||||
if (useXA && tm != null) {
|
if (useXA && tm != null) {
|
||||||
// This is the job for the container,
|
// This is the job for the container,
|
||||||
// however if the container throws an exception because of some other errors,
|
// however if the container throws an exception because of some other errors,
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.ra;
|
||||||
|
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
import javax.resource.ResourceException;
|
import javax.resource.ResourceException;
|
||||||
|
import javax.resource.spi.ApplicationServerInternalException;
|
||||||
import javax.transaction.xa.XAException;
|
import javax.transaction.xa.XAException;
|
||||||
import javax.transaction.xa.XAResource;
|
import javax.transaction.xa.XAResource;
|
||||||
import javax.transaction.xa.Xid;
|
import javax.transaction.xa.Xid;
|
||||||
|
@ -33,6 +34,7 @@ import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
||||||
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
|
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
|
||||||
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
|
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
|
||||||
|
import org.apache.activemq.artemis.service.extensions.ServiceUtils;
|
||||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -74,6 +76,36 @@ public class ActiveMQMessageHandlerXATest extends ActiveMQRATestBase {
|
||||||
qResourceAdapter.stop();
|
qResourceAdapter.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testXABeginFails() throws Exception {
|
||||||
|
ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
|
||||||
|
MyBootstrapContext ctx = new MyBootstrapContext();
|
||||||
|
qResourceAdapter.start(ctx);
|
||||||
|
ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
|
||||||
|
spec.setResourceAdapter(qResourceAdapter);
|
||||||
|
spec.setUseJNDI(false);
|
||||||
|
spec.setDestinationType("javax.jms.Queue");
|
||||||
|
spec.setDestination(MDBQUEUE);
|
||||||
|
qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
XADummyEndpointBegin endpoint = new XADummyEndpointBegin(latch);
|
||||||
|
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, true);
|
||||||
|
qResourceAdapter.endpointActivation(endpointFactory, spec);
|
||||||
|
ClientSession session = locator.createSessionFactory().createSession();
|
||||||
|
ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
|
||||||
|
ClientMessage message = session.createMessage(true);
|
||||||
|
message.getBodyBuffer().writeString("teststring");
|
||||||
|
clientProducer.send(message);
|
||||||
|
session.close();
|
||||||
|
latch.await(5, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
DummyTransaction transaction = (DummyTransaction) ServiceUtils.getTransactionManager().getTransaction();
|
||||||
|
assertTrue(transaction.rollbackOnly);
|
||||||
|
assertTrue(endpoint.afterDelivery);
|
||||||
|
qResourceAdapter.endpointDeactivation(endpointFactory, spec);
|
||||||
|
qResourceAdapter.stop();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testXACommitWhenStopping() throws Exception {
|
public void testXACommitWhenStopping() throws Exception {
|
||||||
ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
|
ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
|
||||||
|
@ -268,4 +300,28 @@ public class ActiveMQMessageHandlerXATest extends ActiveMQRATestBase {
|
||||||
super.release();
|
super.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class XADummyEndpointBegin extends XADummyEndpoint {
|
||||||
|
|
||||||
|
private boolean afterDelivery = false;
|
||||||
|
|
||||||
|
XADummyEndpointBegin(CountDownLatch latch) {
|
||||||
|
super(latch);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException {
|
||||||
|
super.beforeDelivery(method);
|
||||||
|
DummyTransactionManager dummyTransactionManager = (DummyTransactionManager) ServiceUtils.getTransactionManager();
|
||||||
|
DummyTransaction tx = new DummyTransaction();
|
||||||
|
dummyTransactionManager.tx = tx;
|
||||||
|
throw new ApplicationServerInternalException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterDelivery() throws ResourceException {
|
||||||
|
afterDelivery = true;
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.ra;
|
||||||
import javax.transaction.HeuristicMixedException;
|
import javax.transaction.HeuristicMixedException;
|
||||||
import javax.transaction.HeuristicRollbackException;
|
import javax.transaction.HeuristicRollbackException;
|
||||||
import javax.transaction.RollbackException;
|
import javax.transaction.RollbackException;
|
||||||
|
import javax.transaction.Status;
|
||||||
import javax.transaction.Synchronization;
|
import javax.transaction.Synchronization;
|
||||||
import javax.transaction.SystemException;
|
import javax.transaction.SystemException;
|
||||||
import javax.transaction.Transaction;
|
import javax.transaction.Transaction;
|
||||||
|
@ -32,6 +33,9 @@ import javax.transaction.xa.XAResource;
|
||||||
* To change this template use File | Settings | File Templates.
|
* To change this template use File | Settings | File Templates.
|
||||||
*/
|
*/
|
||||||
class DummyTransaction implements Transaction {
|
class DummyTransaction implements Transaction {
|
||||||
|
public boolean rollbackOnly = false;
|
||||||
|
|
||||||
|
public int status = Status.STATUS_ACTIVE;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, SystemException {
|
public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, SystemException {
|
||||||
|
@ -43,11 +47,12 @@ class DummyTransaction implements Transaction {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setRollbackOnly() throws IllegalStateException, SystemException {
|
public void setRollbackOnly() throws IllegalStateException, SystemException {
|
||||||
|
rollbackOnly = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getStatus() throws SystemException {
|
public int getStatus() throws SystemException {
|
||||||
return 0;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -27,7 +27,7 @@ import javax.transaction.TransactionManager;
|
||||||
|
|
||||||
public class DummyTransactionManager implements TransactionManager {
|
public class DummyTransactionManager implements TransactionManager {
|
||||||
|
|
||||||
protected static DummyTransactionManager tm = new DummyTransactionManager();
|
public static DummyTransactionManager tm = new DummyTransactionManager();
|
||||||
|
|
||||||
public Transaction tx;
|
public Transaction tx;
|
||||||
|
|
||||||
|
@ -49,7 +49,7 @@ public class DummyTransactionManager implements TransactionManager {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getStatus() throws SystemException {
|
public int getStatus() throws SystemException {
|
||||||
return 0;
|
return tx.getStatus();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -26,6 +26,7 @@ import javax.transaction.Transaction;
|
||||||
import javax.transaction.TransactionManager;
|
import javax.transaction.TransactionManager;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.service.extensions.transactions.TransactionManagerLocator;
|
import org.apache.activemq.artemis.service.extensions.transactions.TransactionManagerLocator;
|
||||||
|
import org.apache.activemq.artemis.tests.integration.ra.DummyTransactionManager;
|
||||||
|
|
||||||
public class DummyTransactionManagerLocator implements TransactionManagerLocator, TransactionManager {
|
public class DummyTransactionManagerLocator implements TransactionManagerLocator, TransactionManager {
|
||||||
|
|
||||||
|
@ -76,6 +77,6 @@ public class DummyTransactionManagerLocator implements TransactionManagerLocator
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TransactionManager getTransactionManager() {
|
public TransactionManager getTransactionManager() {
|
||||||
return this;
|
return DummyTransactionManager.tm;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue