From 51c34c87d223fdc971e87a0c857d123566a6f90f Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 15 Jun 2015 16:27:54 -0400 Subject: [PATCH] ARTEMIS-136 - XA Error fix with proper exception error https://issues.apache.org/jira/browse/ARTEMIS-136 From what I researched from implementers of XA TM if you throw ERR over communication errors the transaction manager will create an heuristic transaction to be manually dealt with. Other XA Implementations (such as Oracle JDBC) are return FAIL over communication failures during any XA operation. --- .../core/client/impl/ClientSessionImpl.java | 28 +- .../protocol/openwire/amq/AMQTransaction.java | 4 +- .../core/server/impl/ServerSessionImpl.java | 2 +- .../recovery/ActiveMQXAResourceWrapper.java | 2 +- .../InterruptedMessageHandlerTest.java | 384 ++++++++++++++++++ .../integration/client/SessionCloseTest.java | 12 +- 6 files changed, 408 insertions(+), 24 deletions(-) create mode 100644 tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/InterruptedMessageHandlerTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 1fcec4293d..7bea6317de 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -1267,7 +1267,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi { ActiveMQClientLogger.LOGGER.errorCallingEnd(t); // This could occur if the TM interrupts the thread - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } @@ -1293,7 +1293,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi catch (Throwable t) { // This could occur if the TM interrupts the thread - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } @@ -1314,7 +1314,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi catch (Throwable t) { // This could occur if the TM interrupts the thread - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } @@ -1331,7 +1331,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi catch (Throwable t) { // This could occur if the TM interrupts the thread - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } @@ -1430,7 +1430,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi catch (Throwable t) { // This could occur if the TM interrupts the thread - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } @@ -1443,7 +1443,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi ActiveMQClientLogger.LOGGER.errorDuringPrepare(e); // This should never occur - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(e); throw xaException; } @@ -1452,7 +1452,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi ActiveMQClientLogger.LOGGER.errorDuringPrepare(t); // This could occur if the TM interrupts the thread - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } @@ -1475,7 +1475,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi catch (Throwable t) { // This could occur if the TM interrupts the thread - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } @@ -1537,14 +1537,14 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } // This should never occur - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(e); throw xaException; } catch (Throwable t) { // This could occur if the TM interrupts the thread - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } @@ -1585,21 +1585,21 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi catch (Throwable t) { // This could occur if the TM interrupts the thread - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } } // This should never occur - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(e); throw xaException; } catch (Throwable t) { // This could occur if the TM interrupts the thread - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } @@ -1744,7 +1744,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi if (!xa) { ActiveMQClientLogger.LOGGER.sessionNotXA(); - throw new XAException(XAException.XAER_RMERR); + throw new XAException(XAException.XAER_RMFAIL); } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransaction.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransaction.java index e179c8183a..afd734e495 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransaction.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransaction.java @@ -226,7 +226,7 @@ public abstract class AMQTransaction // to execute properly. getLog().warn("PRE COMMIT FAILED: ", e); XAException xae = new XAException("PRE COMMIT FAILED"); - xae.errorCode = XAException.XAER_RMERR; + xae.errorCode = XAException.XAER_RMFAIL; xae.initCause(e); throw xae; } @@ -245,7 +245,7 @@ public abstract class AMQTransaction // to execute properly. getLog().warn("POST COMMIT FAILED: ", e); XAException xae = new XAException("POST COMMIT FAILED"); - xae.errorCode = XAException.XAER_RMERR; + xae.errorCode = XAException.XAER_RMFAIL; xae.initCause(e); throw xae; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 0262a543fa..3393cc212f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1051,7 +1051,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { e.printStackTrace(); - throw new ActiveMQXAException(XAException.XAER_RMERR); + throw new ActiveMQXAException(XAException.XAER_RMFAIL); } } else diff --git a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXAResourceWrapper.java b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXAResourceWrapper.java index 2dde46b395..9c886bd7ad 100644 --- a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXAResourceWrapper.java +++ b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXAResourceWrapper.java @@ -313,7 +313,7 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList else { XAException xae = new XAException("Error trying to connect to any providers for xa recovery"); - xae.errorCode = XAException.XAER_RMERR; + xae.errorCode = XAException.XAER_RMFAIL; if (error != null) { xae.initCause(error); diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/InterruptedMessageHandlerTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/InterruptedMessageHandlerTest.java new file mode 100644 index 0000000000..e5d638f6e2 --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/InterruptedMessageHandlerTest.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.extras.byteman; + +import javax.jms.Message; +import javax.resource.ResourceException; +import javax.transaction.SystemException; +import javax.transaction.Transaction; +import javax.transaction.TransactionManager; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import java.lang.reflect.Method; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import com.arjuna.ats.arjuna.coordinator.TransactionReaper; +import com.arjuna.ats.arjuna.coordinator.TwoPhaseOutcome; +import com.arjuna.ats.arjuna.coordinator.TxControl; +import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter; +import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec; +import org.apache.activemq.artemis.tests.integration.ra.ActiveMQRATestBase; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(BMUnitRunner.class) +public class InterruptedMessageHandlerTest extends ActiveMQRATestBase +{ + + protected boolean usePersistence() + { + return true; + } + + @Override + public boolean useSecurity() + { + return false; + } + + @Test + @BMRules( + rules = { @BMRule( + name = "throw ActiveMQException(CONNETION_TIMEOUT) during rollback", + targetClass = "org.apache.activemq.artemis.core.client.impl.ClientSessionImpl", + targetMethod = "flushAcks", + targetLocation = "AFTER INVOKE flushAcks", + action = "org.apache.activemq.artemis.tests.extras.byteman.InterruptedMessageHandlerTest.throwActiveMQQExceptionConnectionTimeout();"), + @BMRule( + name = "check that outcome of XA transaction is TwoPhaseOutcome.FINISH_ERROR=8", + targetClass = "com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord", + targetMethod = "topLevelAbort", + targetLocation = "AT EXIT", + action = "org.apache.activemq.artemis.tests.extras.byteman.InterruptedMessageHandlerTest.assertTxOutComeIsOfStatusFinishedError($!);") }) + public void testSimpleMessageReceivedOnQueueTwoPhaseFailPrepareByConnectionTimout() throws Exception + { + ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter(); + resourceAdapter = qResourceAdapter; + + + MyBootstrapContext ctx = new MyBootstrapContext(); + + qResourceAdapter.setConnectorClassName(NETTY_CONNECTOR_FACTORY); + qResourceAdapter.start(ctx); + + + ActiveMQActivationSpec spec = new ActiveMQActivationSpec(); + spec.setMaxSession(1); + spec.setCallTimeout(1000L); + spec.setResourceAdapter(qResourceAdapter); + spec.setUseJNDI(false); + spec.setDestinationType("javax.jms.Queue"); + spec.setDestination(MDBQUEUE); + + CountDownLatch latch = new CountDownLatch(1); + + XADummyEndpointWithDummyXAResourceFailEnd endpoint = new XADummyEndpointWithDummyXAResourceFailEnd(latch, true); + + DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, true); + + qResourceAdapter.endpointActivation(endpointFactory, spec); + + ClientSession session = locator.createSessionFactory().createSession(); + + ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED); + + ClientMessage message = session.createMessage(true); + + message.getBodyBuffer().writeString("teststring"); + + clientProducer.send(message); + + session.close(); + + latch.await(5, TimeUnit.SECONDS); + + assertNotNull(endpoint.lastMessage); + assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "teststring"); + + qResourceAdapter.endpointDeactivation(endpointFactory, spec); + + qResourceAdapter.stop(); + + server.stop(); + + assertEquals("Two phase outcome must be of TwoPhaseOutcome.FINISH_ERROR.", TwoPhaseOutcome.FINISH_ERROR, + txTwoPhaseOutCome.intValue()); + + } + + static volatile ActiveMQResourceAdapter resourceAdapter; + static boolean resourceAdapterStopped = false; + + public static void interrupt() throws InterruptedException + { + if (!resourceAdapterStopped) + { + resourceAdapter.stop(); + resourceAdapterStopped = true; + throw new InterruptedException("foo"); + } + } + + public static void throwActiveMQQExceptionConnectionTimeout() throws ActiveMQException, XAException + { + StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace(); + for (StackTraceElement element : stackTraceElements) + { + if (element.getClassName().contains("ClientSessionImpl") && element.getMethodName().contains("rollback")) + { + throw new ActiveMQException(ActiveMQExceptionType.CONNECTION_TIMEDOUT); + } + } + } + + static Integer txTwoPhaseOutCome = null; + + public static void assertTxOutComeIsOfStatusFinishedError(int txOutCome) + { + // check only first trigger of byteman rule + if (txTwoPhaseOutCome == null) + { + txTwoPhaseOutCome = Integer.valueOf(txOutCome); + } + } + + Transaction currentTX; + + public class XADummyEndpoint extends DummyMessageEndpoint + { + final boolean twoPhase; + ClientSession session; + int afterDeliveryCounts = 0; + + public XADummyEndpoint(CountDownLatch latch, boolean twoPhase) throws SystemException + { + super(latch); + this.twoPhase = twoPhase; + try + { + session = locator.createSessionFactory().createSession(true, false, false); + } + catch (Throwable e) + { + throw new RuntimeException(e); + } + } + + @Override + public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException + { + super.beforeDelivery(method); + try + { + DummyTMLocator.tm.begin(); + currentTX = DummyTMLocator.tm.getTransaction(); + currentTX.enlistResource(xaResource); + if (twoPhase) + { + currentTX.enlistResource(new DummyXAResource()); + } + } + catch (Throwable e) + { + throw new RuntimeException(e.getMessage(), e); + } + } + + public void onMessage(Message message) + { + super.onMessage(message); + } + + @Override + public void afterDelivery() throws ResourceException + { + afterDeliveryCounts++; + try + { + currentTX.commit(); + } + catch (Throwable e) + { + // its unsure as to whether the EJB/JCA layer will handle this or throw it to us, + // either way we don't do anything else so its fine just to throw. + // NB this will only happen with 2 phase commit + throw new RuntimeException(e); + } + super.afterDelivery(); + } + } + + @Before + public void setUp() throws Exception + { + resourceAdapter = null; + resourceAdapterStopped = false; + super.setUp(); + DummyTMLocator.startTM(); + } + + @After + public void tearDown() throws Exception + { + DummyTMLocator.stopTM(); + super.tearDown(); + } + + public static class DummyTMLocator + { + public static TransactionManagerImple tm; + + public static void stopTM() + { + try + { + TransactionReaper.terminate(true); + TxControl.disable(true); + } + catch (Exception e) + { + e.printStackTrace(); + } + tm = null; + } + + public static void startTM() + { + tm = new TransactionManagerImple(); + TxControl.enable(); + } + + public TransactionManager getTM() + { + return tm; + } + } + + static class DummyXAResource implements XAResource + { + @Override + public void commit(Xid xid, boolean b) throws XAException + { + + } + + @Override + public void end(Xid xid, int i) throws XAException + { + + } + + @Override + public void forget(Xid xid) throws XAException + { + + } + + @Override + public int getTransactionTimeout() throws XAException + { + return 0; + } + + @Override + public boolean isSameRM(XAResource xaResource) throws XAException + { + return false; + } + + @Override + public int prepare(Xid xid) throws XAException + { + return 0; + } + + @Override + public Xid[] recover(int i) throws XAException + { + return new Xid[0]; + } + + @Override + public void rollback(Xid xid) throws XAException + { + + } + + @Override + public boolean setTransactionTimeout(int i) throws XAException + { + return false; + } + + @Override + public void start(Xid xid, int i) throws XAException + { + + } + } + + public class XADummyEndpointWithDummyXAResourceFailEnd extends XADummyEndpoint + { + + public XADummyEndpointWithDummyXAResourceFailEnd(CountDownLatch latch, boolean twoPhase) throws SystemException + { + super(latch, twoPhase); + } + + @Override + public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException + { + try + { + DummyTMLocator.tm.begin(); + currentTX = DummyTMLocator.tm.getTransaction(); + currentTX.enlistResource(xaResource); + if (twoPhase) + { + currentTX.enlistResource(new DummyXAResourceFailEnd()); + } + } + catch (Throwable e) + { + throw new RuntimeException(e.getMessage(), e); + } + } + + } + + static class DummyXAResourceFailEnd extends DummyXAResource + { + @Override + public void end(Xid xid, int i) throws XAException + { + throw new XAException(XAException.XAER_RMFAIL); + } + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionCloseTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionCloseTest.java index a5abb5239c..7f3a80ccd5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionCloseTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionCloseTest.java @@ -165,7 +165,7 @@ public class SessionCloseTest extends ActiveMQTestBase } }); - ActiveMQTestBase.expectXAException(XAException.XAER_RMERR, new ActiveMQAction() + ActiveMQTestBase.expectXAException(XAException.XAER_RMFAIL, new ActiveMQAction() { public void run() throws XAException { @@ -173,7 +173,7 @@ public class SessionCloseTest extends ActiveMQTestBase } }); - ActiveMQTestBase.expectXAException(XAException.XAER_RMERR, new ActiveMQAction() + ActiveMQTestBase.expectXAException(XAException.XAER_RMFAIL, new ActiveMQAction() { public void run() throws XAException { @@ -181,7 +181,7 @@ public class SessionCloseTest extends ActiveMQTestBase } }); - ActiveMQTestBase.expectXAException(XAException.XAER_RMERR, new ActiveMQAction() + ActiveMQTestBase.expectXAException(XAException.XAER_RMFAIL, new ActiveMQAction() { public void run() throws XAException { @@ -189,7 +189,7 @@ public class SessionCloseTest extends ActiveMQTestBase } }); - ActiveMQTestBase.expectXAException(XAException.XAER_RMERR, new ActiveMQAction() + ActiveMQTestBase.expectXAException(XAException.XAER_RMFAIL, new ActiveMQAction() { public void run() throws XAException { @@ -197,7 +197,7 @@ public class SessionCloseTest extends ActiveMQTestBase } }); - ActiveMQTestBase.expectXAException(XAException.XAER_RMERR, new ActiveMQAction() + ActiveMQTestBase.expectXAException(XAException.XAER_RMFAIL, new ActiveMQAction() { public void run() throws XAException { @@ -205,7 +205,7 @@ public class SessionCloseTest extends ActiveMQTestBase } }); - ActiveMQTestBase.expectXAException(XAException.XAER_RMERR, new ActiveMQAction() + ActiveMQTestBase.expectXAException(XAException.XAER_RMFAIL, new ActiveMQAction() { public void run() throws XAException {