diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 5c4dc66279..a08e8ab17f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -723,7 +723,7 @@ public interface ActiveMQServerLogger extends BasicLogger { @LogMessage(level = Logger.Level.WARN) @Message(id = 222103, value = "transaction with xid {0} timed out", format = Message.Format.MESSAGE_FORMAT) - void unexpectedXid(Xid xid); + void timedOutXID(Xid xid); @LogMessage(level = Logger.Level.WARN) @Message(id = 222104, value = "IO Error completing the transaction, code = {0}, message = {1}", format = Message.Format.MESSAGE_FORMAT) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/ResourceManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/ResourceManagerImpl.java index 31160aca74..0d420ffe2e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/ResourceManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/ResourceManagerImpl.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.transaction.impl; +import javax.transaction.xa.Xid; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -29,8 +30,6 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import javax.transaction.xa.Xid; - import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -178,10 +177,13 @@ public class ResourceManagerImpl implements ResourceManager { long now = System.currentTimeMillis(); for (Transaction tx : transactions.values()) { + if (tx.hasTimedOut(now, defaultTimeoutSeconds)) { - transactions.remove(tx.getXid()); - ActiveMQServerLogger.LOGGER.unexpectedXid(tx.getXid()); - timedoutTransactions.add(tx); + Transaction removedTX = removeTransaction(tx.getXid()); + if (removedTX != null) { + ActiveMQServerLogger.LOGGER.timedOutXID(removedTX.getXid()); + timedoutTransactions.add(removedTX); + } } } @@ -194,7 +196,6 @@ public class ResourceManagerImpl implements ResourceManager { } } } - synchronized void setFuture(final Future future) { this.future = future; } @@ -209,6 +210,7 @@ public class ResourceManagerImpl implements ResourceManager { } + private static final class HeuristicCompletionHolder { public final boolean isCommit; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java index b71c7428b2..e991454b21 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java @@ -271,6 +271,7 @@ public class TransactionImpl implements Transaction { beforeRollback(); doRollback(); + state = State.ROLLEDBACK; // We use the Callback even for non persistence // If we are using non-persistence with replication, the replication manager will have @@ -283,7 +284,6 @@ public class TransactionImpl implements Transaction { public void done() { afterRollback(); - state = State.ROLLEDBACK; } }); } diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/TimeoutXATest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/TimeoutXATest.java new file mode 100644 index 0000000000..ef8a6fa09a --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/TimeoutXATest.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.extras.byteman; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.XAConnection; +import javax.jms.XASession; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import java.util.concurrent.CountDownLatch; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(BMUnitRunner.class) +public class TimeoutXATest extends ActiveMQTestBase { + + + protected ActiveMQServer server = null; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + server = createServer(createDefaultNettyConfig()); + server.getConfiguration().setTransactionTimeout(1000); + server.getConfiguration().setTransactionTimeoutScanPeriod(1100); + server.getConfiguration().setJournalSyncNonTransactional(false); + server.start(); + server.createQueue(SimpleString.toSimpleString("jms.queue.Queue1"), SimpleString.toSimpleString("jms.queue.Queue1"), null, true, false); + + removingTXEntered0 = new CountDownLatch(1); + removingTXAwait0 = new CountDownLatch(1); + removingTXEntered1 = new CountDownLatch(1); + removingTXAwait1 = new CountDownLatch(1); + entered = 0; + + enteredRollback = 0; + enteredRollbackLatch = new CountDownLatch(1); + waitingRollbackLatch = new CountDownLatch(1); + } + + static int entered; + static CountDownLatch removingTXEntered0; + static CountDownLatch removingTXAwait0; + static CountDownLatch removingTXEntered1; + static CountDownLatch removingTXAwait1; + + static int enteredRollback; + static CountDownLatch enteredRollbackLatch; + static CountDownLatch waitingRollbackLatch; + + + @Test + @BMRules( + rules = {@BMRule( + name = "removing TX", + targetClass = "org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl", + targetMethod = "removeTransaction(javax.transaction.xa.Xid)", + targetLocation = "ENTRY", + helper = "org.apache.activemq.artemis.tests.extras.byteman.TimeoutXATest", + action = "removingTX()"), + @BMRule( + name = "afterRollback TX", + targetClass = "org.apache.activemq.artemis.core.transaction.impl.TransactionImpl", + targetMethod = "afterRollback()", + targetLocation = "ENTRY", + helper = "org.apache.activemq.artemis.tests.extras.byteman.TimeoutXATest", + action = "afterRollback()")}) + public void testTimeoutOnTX2() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); + XAConnection connection = (XAConnection)connectionFactory.createXAConnection(); + + Connection connction2 = connectionFactory.createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("Queue1"); + + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("hello " + 1)); + } + + session.commit(); + + final XASession xaSession = connection.createXASession(); + final Xid xid = newXID(); + + xaSession.getXAResource().start(xid, XAResource.TMNOFLAGS); + MessageConsumer consumer = xaSession.createConsumer(queue); + connection.start(); + for (int i = 0; i < 10; i++) { + Assert.assertNotNull(consumer.receive(5000)); + } + xaSession.getXAResource().end(xid, XAResource.TMSUCCESS); + + final CountDownLatch latchStore = new CountDownLatch(1000); + + Thread storingThread = new Thread() { + public void run() { + try { + for (int i = 0; i < 100000; i++) { + latchStore.countDown(); + server.getStorageManager().storeDuplicateID(SimpleString.toSimpleString("crebis"), new byte[]{1}, server.getStorageManager().generateID()); + } + } + catch (Exception e) { + e.printStackTrace(); + } + } + }; + + removingTXEntered0.await(); + + Thread t = new Thread() { + public void run() { + try { + xaSession.getXAResource().rollback(xid); + } + catch (Exception e) { + e.printStackTrace(); + } + + } + }; + + t.start(); + + + removingTXEntered1.await(); + + storingThread.start(); + latchStore.await(); + + removingTXAwait1.countDown(); + + Thread.sleep(1000); + removingTXAwait0.countDown(); + + enteredRollbackLatch.await(); + + waitingRollbackLatch.countDown(); + + t.join(); + + consumer.close(); +// +// connction2.start(); +// + consumer = session.createConsumer(queue); + for (int i = 0; i < 10; i++) { + Assert.assertNotNull(consumer.receive(5000)); + } + Assert.assertNull(consumer.receiveNoWait()); +// session.commit(); +// session.close(); + connection.close(); + connction2.close(); + + } + + public void afterRollback() { + if (enteredRollback++ == 0) { + enteredRollbackLatch.countDown(); + try { + waitingRollbackLatch.await(); + } + catch (Throwable e) { + + } + } + + } + + public void removingTX() { + int xent = entered++; + + if (xent == 0) { + removingTXEntered0.countDown(); + try { + removingTXAwait0.await(); + } + catch (Throwable ignored) { + ignored.printStackTrace(); + } + } + else if (xent == 1) { + removingTXEntered1.countDown(); + try { + removingTXAwait1.await(); + } + catch (Throwable ignored) { + ignored.printStackTrace(); + } + } + + } +}