diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 1fcec4293d..7bea6317de 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -1267,7 +1267,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi { ActiveMQClientLogger.LOGGER.errorCallingEnd(t); // This could occur if the TM interrupts the thread - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } @@ -1293,7 +1293,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi catch (Throwable t) { // This could occur if the TM interrupts the thread - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } @@ -1314,7 +1314,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi catch (Throwable t) { // This could occur if the TM interrupts the thread - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } @@ -1331,7 +1331,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi catch (Throwable t) { // This could occur if the TM interrupts the thread - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } @@ -1430,7 +1430,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi catch (Throwable t) { // This could occur if the TM interrupts the thread - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } @@ -1443,7 +1443,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi ActiveMQClientLogger.LOGGER.errorDuringPrepare(e); // This should never occur - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(e); throw xaException; } @@ -1452,7 +1452,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi ActiveMQClientLogger.LOGGER.errorDuringPrepare(t); // This could occur if the TM interrupts the thread - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } @@ -1475,7 +1475,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi catch (Throwable t) { // This could occur if the TM interrupts the thread - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } @@ -1537,14 +1537,14 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } // This should never occur - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(e); throw xaException; } catch (Throwable t) { // This could occur if the TM interrupts the thread - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } @@ -1585,21 +1585,21 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi catch (Throwable t) { // This could occur if the TM interrupts the thread - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } } // This should never occur - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(e); throw xaException; } catch (Throwable t) { // This could occur if the TM interrupts the thread - XAException xaException = new XAException(XAException.XAER_RMERR); + XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } @@ -1744,7 +1744,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi if (!xa) { ActiveMQClientLogger.LOGGER.sessionNotXA(); - throw new XAException(XAException.XAER_RMERR); + throw new XAException(XAException.XAER_RMFAIL); } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransaction.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransaction.java index e179c8183a..afd734e495 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransaction.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransaction.java @@ -226,7 +226,7 @@ public abstract class AMQTransaction // to execute properly. getLog().warn("PRE COMMIT FAILED: ", e); XAException xae = new XAException("PRE COMMIT FAILED"); - xae.errorCode = XAException.XAER_RMERR; + xae.errorCode = XAException.XAER_RMFAIL; xae.initCause(e); throw xae; } @@ -245,7 +245,7 @@ public abstract class AMQTransaction // to execute properly. getLog().warn("POST COMMIT FAILED: ", e); XAException xae = new XAException("POST COMMIT FAILED"); - xae.errorCode = XAException.XAER_RMERR; + xae.errorCode = XAException.XAER_RMFAIL; xae.initCause(e); throw xae; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 0262a543fa..3393cc212f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1051,7 +1051,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { e.printStackTrace(); - throw new ActiveMQXAException(XAException.XAER_RMERR); + throw new ActiveMQXAException(XAException.XAER_RMFAIL); } } else diff --git a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXAResourceWrapper.java b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXAResourceWrapper.java index 2dde46b395..9c886bd7ad 100644 --- a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXAResourceWrapper.java +++ b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXAResourceWrapper.java @@ -313,7 +313,7 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList else { XAException xae = new XAException("Error trying to connect to any providers for xa recovery"); - xae.errorCode = XAException.XAER_RMERR; + xae.errorCode = XAException.XAER_RMFAIL; if (error != null) { xae.initCause(error); diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/InterruptedMessageHandlerTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/InterruptedMessageHandlerTest.java new file mode 100644 index 0000000000..e5d638f6e2 --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/InterruptedMessageHandlerTest.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.extras.byteman; + +import javax.jms.Message; +import javax.resource.ResourceException; +import javax.transaction.SystemException; +import javax.transaction.Transaction; +import javax.transaction.TransactionManager; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import java.lang.reflect.Method; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import com.arjuna.ats.arjuna.coordinator.TransactionReaper; +import com.arjuna.ats.arjuna.coordinator.TwoPhaseOutcome; +import com.arjuna.ats.arjuna.coordinator.TxControl; +import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter; +import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec; +import org.apache.activemq.artemis.tests.integration.ra.ActiveMQRATestBase; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(BMUnitRunner.class) +public class InterruptedMessageHandlerTest extends ActiveMQRATestBase +{ + + protected boolean usePersistence() + { + return true; + } + + @Override + public boolean useSecurity() + { + return false; + } + + @Test + @BMRules( + rules = { @BMRule( + name = "throw ActiveMQException(CONNETION_TIMEOUT) during rollback", + targetClass = "org.apache.activemq.artemis.core.client.impl.ClientSessionImpl", + targetMethod = "flushAcks", + targetLocation = "AFTER INVOKE flushAcks", + action = "org.apache.activemq.artemis.tests.extras.byteman.InterruptedMessageHandlerTest.throwActiveMQQExceptionConnectionTimeout();"), + @BMRule( + name = "check that outcome of XA transaction is TwoPhaseOutcome.FINISH_ERROR=8", + targetClass = "com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord", + targetMethod = "topLevelAbort", + targetLocation = "AT EXIT", + action = "org.apache.activemq.artemis.tests.extras.byteman.InterruptedMessageHandlerTest.assertTxOutComeIsOfStatusFinishedError($!);") }) + public void testSimpleMessageReceivedOnQueueTwoPhaseFailPrepareByConnectionTimout() throws Exception + { + ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter(); + resourceAdapter = qResourceAdapter; + + + MyBootstrapContext ctx = new MyBootstrapContext(); + + qResourceAdapter.setConnectorClassName(NETTY_CONNECTOR_FACTORY); + qResourceAdapter.start(ctx); + + + ActiveMQActivationSpec spec = new ActiveMQActivationSpec(); + spec.setMaxSession(1); + spec.setCallTimeout(1000L); + spec.setResourceAdapter(qResourceAdapter); + spec.setUseJNDI(false); + spec.setDestinationType("javax.jms.Queue"); + spec.setDestination(MDBQUEUE); + + CountDownLatch latch = new CountDownLatch(1); + + XADummyEndpointWithDummyXAResourceFailEnd endpoint = new XADummyEndpointWithDummyXAResourceFailEnd(latch, true); + + DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, true); + + qResourceAdapter.endpointActivation(endpointFactory, spec); + + ClientSession session = locator.createSessionFactory().createSession(); + + ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED); + + ClientMessage message = session.createMessage(true); + + message.getBodyBuffer().writeString("teststring"); + + clientProducer.send(message); + + session.close(); + + latch.await(5, TimeUnit.SECONDS); + + assertNotNull(endpoint.lastMessage); + assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "teststring"); + + qResourceAdapter.endpointDeactivation(endpointFactory, spec); + + qResourceAdapter.stop(); + + server.stop(); + + assertEquals("Two phase outcome must be of TwoPhaseOutcome.FINISH_ERROR.", TwoPhaseOutcome.FINISH_ERROR, + txTwoPhaseOutCome.intValue()); + + } + + static volatile ActiveMQResourceAdapter resourceAdapter; + static boolean resourceAdapterStopped = false; + + public static void interrupt() throws InterruptedException + { + if (!resourceAdapterStopped) + { + resourceAdapter.stop(); + resourceAdapterStopped = true; + throw new InterruptedException("foo"); + } + } + + public static void throwActiveMQQExceptionConnectionTimeout() throws ActiveMQException, XAException + { + StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace(); + for (StackTraceElement element : stackTraceElements) + { + if (element.getClassName().contains("ClientSessionImpl") && element.getMethodName().contains("rollback")) + { + throw new ActiveMQException(ActiveMQExceptionType.CONNECTION_TIMEDOUT); + } + } + } + + static Integer txTwoPhaseOutCome = null; + + public static void assertTxOutComeIsOfStatusFinishedError(int txOutCome) + { + // check only first trigger of byteman rule + if (txTwoPhaseOutCome == null) + { + txTwoPhaseOutCome = Integer.valueOf(txOutCome); + } + } + + Transaction currentTX; + + public class XADummyEndpoint extends DummyMessageEndpoint + { + final boolean twoPhase; + ClientSession session; + int afterDeliveryCounts = 0; + + public XADummyEndpoint(CountDownLatch latch, boolean twoPhase) throws SystemException + { + super(latch); + this.twoPhase = twoPhase; + try + { + session = locator.createSessionFactory().createSession(true, false, false); + } + catch (Throwable e) + { + throw new RuntimeException(e); + } + } + + @Override + public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException + { + super.beforeDelivery(method); + try + { + DummyTMLocator.tm.begin(); + currentTX = DummyTMLocator.tm.getTransaction(); + currentTX.enlistResource(xaResource); + if (twoPhase) + { + currentTX.enlistResource(new DummyXAResource()); + } + } + catch (Throwable e) + { + throw new RuntimeException(e.getMessage(), e); + } + } + + public void onMessage(Message message) + { + super.onMessage(message); + } + + @Override + public void afterDelivery() throws ResourceException + { + afterDeliveryCounts++; + try + { + currentTX.commit(); + } + catch (Throwable e) + { + // its unsure as to whether the EJB/JCA layer will handle this or throw it to us, + // either way we don't do anything else so its fine just to throw. + // NB this will only happen with 2 phase commit + throw new RuntimeException(e); + } + super.afterDelivery(); + } + } + + @Before + public void setUp() throws Exception + { + resourceAdapter = null; + resourceAdapterStopped = false; + super.setUp(); + DummyTMLocator.startTM(); + } + + @After + public void tearDown() throws Exception + { + DummyTMLocator.stopTM(); + super.tearDown(); + } + + public static class DummyTMLocator + { + public static TransactionManagerImple tm; + + public static void stopTM() + { + try + { + TransactionReaper.terminate(true); + TxControl.disable(true); + } + catch (Exception e) + { + e.printStackTrace(); + } + tm = null; + } + + public static void startTM() + { + tm = new TransactionManagerImple(); + TxControl.enable(); + } + + public TransactionManager getTM() + { + return tm; + } + } + + static class DummyXAResource implements XAResource + { + @Override + public void commit(Xid xid, boolean b) throws XAException + { + + } + + @Override + public void end(Xid xid, int i) throws XAException + { + + } + + @Override + public void forget(Xid xid) throws XAException + { + + } + + @Override + public int getTransactionTimeout() throws XAException + { + return 0; + } + + @Override + public boolean isSameRM(XAResource xaResource) throws XAException + { + return false; + } + + @Override + public int prepare(Xid xid) throws XAException + { + return 0; + } + + @Override + public Xid[] recover(int i) throws XAException + { + return new Xid[0]; + } + + @Override + public void rollback(Xid xid) throws XAException + { + + } + + @Override + public boolean setTransactionTimeout(int i) throws XAException + { + return false; + } + + @Override + public void start(Xid xid, int i) throws XAException + { + + } + } + + public class XADummyEndpointWithDummyXAResourceFailEnd extends XADummyEndpoint + { + + public XADummyEndpointWithDummyXAResourceFailEnd(CountDownLatch latch, boolean twoPhase) throws SystemException + { + super(latch, twoPhase); + } + + @Override + public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException + { + try + { + DummyTMLocator.tm.begin(); + currentTX = DummyTMLocator.tm.getTransaction(); + currentTX.enlistResource(xaResource); + if (twoPhase) + { + currentTX.enlistResource(new DummyXAResourceFailEnd()); + } + } + catch (Throwable e) + { + throw new RuntimeException(e.getMessage(), e); + } + } + + } + + static class DummyXAResourceFailEnd extends DummyXAResource + { + @Override + public void end(Xid xid, int i) throws XAException + { + throw new XAException(XAException.XAER_RMFAIL); + } + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionCloseTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionCloseTest.java index a5abb5239c..7f3a80ccd5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionCloseTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionCloseTest.java @@ -165,7 +165,7 @@ public class SessionCloseTest extends ActiveMQTestBase } }); - ActiveMQTestBase.expectXAException(XAException.XAER_RMERR, new ActiveMQAction() + ActiveMQTestBase.expectXAException(XAException.XAER_RMFAIL, new ActiveMQAction() { public void run() throws XAException { @@ -173,7 +173,7 @@ public class SessionCloseTest extends ActiveMQTestBase } }); - ActiveMQTestBase.expectXAException(XAException.XAER_RMERR, new ActiveMQAction() + ActiveMQTestBase.expectXAException(XAException.XAER_RMFAIL, new ActiveMQAction() { public void run() throws XAException { @@ -181,7 +181,7 @@ public class SessionCloseTest extends ActiveMQTestBase } }); - ActiveMQTestBase.expectXAException(XAException.XAER_RMERR, new ActiveMQAction() + ActiveMQTestBase.expectXAException(XAException.XAER_RMFAIL, new ActiveMQAction() { public void run() throws XAException { @@ -189,7 +189,7 @@ public class SessionCloseTest extends ActiveMQTestBase } }); - ActiveMQTestBase.expectXAException(XAException.XAER_RMERR, new ActiveMQAction() + ActiveMQTestBase.expectXAException(XAException.XAER_RMFAIL, new ActiveMQAction() { public void run() throws XAException { @@ -197,7 +197,7 @@ public class SessionCloseTest extends ActiveMQTestBase } }); - ActiveMQTestBase.expectXAException(XAException.XAER_RMERR, new ActiveMQAction() + ActiveMQTestBase.expectXAException(XAException.XAER_RMFAIL, new ActiveMQAction() { public void run() throws XAException { @@ -205,7 +205,7 @@ public class SessionCloseTest extends ActiveMQTestBase } }); - ActiveMQTestBase.expectXAException(XAException.XAER_RMERR, new ActiveMQAction() + ActiveMQTestBase.expectXAException(XAException.XAER_RMFAIL, new ActiveMQAction() { public void run() throws XAException {