diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 6134784ebd..0a24022179 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -532,6 +532,10 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi rollbackOnly = false; } + public void markRollbackOnly() { + rollbackOnly = true; + } + public ClientMessage createMessage(final byte type, final boolean durable, final long expiration, @@ -1036,7 +1040,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi // we should never throw rollback if we have already prepared if (rollbackOnly) { - ActiveMQClientLogger.LOGGER.commitAfterFailover(); + if (onePhase) { + throw new XAException(XAException.XAER_RMFAIL); + } + else { + ActiveMQClientLogger.LOGGER.commitAfterFailover(); + } } // Note - don't need to flush acks since the previous end would have diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java index 06d60249d2..cd697c0793 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java @@ -93,6 +93,8 @@ public interface ClientSessionInternal extends ClientSession { void resetIfNeeded() throws ActiveMQException; + void markRollbackOnly(); + /** * This is used internally to control and educate the user * about using the thread boundaries properly. diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java index 2d6f4a4795..4d72dc93e2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java @@ -141,6 +141,10 @@ public class DelegatingSession implements ClientSessionInternal { session.close(); } + public void markRollbackOnly() { + session.markRollbackOnly(); + } + public void commit() throws ActiveMQException { session.commit(); } diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java index f6a85355d2..82eb50d3b9 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java @@ -38,9 +38,9 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; -import org.apache.activemq.artemis.ra.ActiveMQRALogger; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.jms.client.ActiveMQMessage; +import org.apache.activemq.artemis.ra.ActiveMQRALogger; import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter; import org.apache.activemq.artemis.service.extensions.ServiceUtils; import org.apache.activemq.artemis.service.extensions.xa.ActiveMQXAResourceWrapper; @@ -292,6 +292,11 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList if (activation.getActivationSpec().getTransactionTimeout() > 0 && tm != null) { tm.setTransactionTimeout(activation.getActivationSpec().getTransactionTimeout()); } + + if (trace) { + ActiveMQRALogger.LOGGER.trace("HornetQMessageHandler::calling beforeDelivery on message " + message); + } + endpoint.beforeDelivery(ActiveMQActivation.ONMESSAGE); beforeDelivery = true; msg.doBeforeReceive(); @@ -299,13 +304,17 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList //In the transacted case the message must be acked *before* onMessage is called if (transacted) { - message.acknowledge(); + message.individualAcknowledge(); } ((MessageListener) endpoint).onMessage(msg); if (!transacted) { - message.acknowledge(); + message.individualAcknowledge(); + } + + if (trace) { + ActiveMQRALogger.LOGGER.trace("HornetQMessageHandler::calling afterDelivery on message " + message); } try { @@ -313,6 +322,10 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList } catch (ResourceException e) { ActiveMQRALogger.LOGGER.unableToCallAfterDelivery(e); + // If we get here, The TX was already rolled back + // However we must do some stuff now to make sure the client message buffer is cleared + // so we mark this as rollbackonly + session.markRollbackOnly(); return; } if (useLocalTx) { @@ -340,13 +353,6 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList } catch (Exception e1) { ActiveMQRALogger.LOGGER.warn("unnable to clear the transaction", e1); - try { - session.rollback(); - } - catch (ActiveMQException e2) { - ActiveMQRALogger.LOGGER.warn("Unable to rollback", e2); - return; - } } } @@ -369,6 +375,10 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList ActiveMQRALogger.LOGGER.unableToRollbackTX(); } } + + // This is to make sure we will issue a rollback after failures + // so that would cleanup consumer buffers among other things + session.markRollbackOnly(); } finally { try { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index ee60a1481e..a5b3622864 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -45,8 +45,8 @@ import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; -import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PagedReference; @@ -58,10 +58,10 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; import org.apache.activemq.artemis.core.remoting.server.RemotingService; +import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.HandleStatus; -import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; @@ -1548,7 +1548,7 @@ public class QueueImpl implements Queue { Binding binding = postOffice.getBinding(originalMessageQueue); if (binding != null && binding instanceof LocalQueueBinding) { - targetQueue = ((LocalQueueBinding)binding).getID(); + targetQueue = ((LocalQueueBinding) binding).getID(); queues.put(originalMessageQueue, targetQueue); } } @@ -1562,12 +1562,10 @@ public class QueueImpl implements Queue { } - } } }); - } public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception { @@ -2025,6 +2023,9 @@ public class QueueImpl implements Queue { private void internalAddRedistributor(final Executor executor) { // create the redistributor only once if there are no local consumers if (consumerSet.isEmpty() && redistributor == null) { + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("QueueImpl::Adding redistributor on queue " + this.toString()); + } redistributor = new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE); consumerList.add(new ConsumerHolder(redistributor)); @@ -2103,7 +2104,7 @@ public class QueueImpl implements Queue { final MessageReference ref, final boolean expiry, final boolean rejectDuplicate, - final long ... queueIDs) throws Exception { + final long... queueIDs) throws Exception { ServerMessage copyMessage = makeCopy(ref, expiry); copyMessage.setAddress(toAddress); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 8cc9912914..ae30549f36 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -380,7 +380,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { @Override public void close(final boolean failed) throws Exception { if (isTrace) { - ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace")); + ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace")); } callback.removeReadyListener(this); @@ -405,7 +405,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { MessageReference ref = iter.next(); if (isTrace) { - ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " cancelling reference " + ref); + ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " cancelling reference " + ref); } ref.getQueue().cancel(tx, ref, true); @@ -662,14 +662,20 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { MessageReference ref; do { - ref = deliveringRefs.poll(); + synchronized (lock) { + ref = deliveringRefs.poll(); + } if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) { ActiveMQServerLogger.LOGGER.trace("ACKing ref " + ref + " on tx= " + tx + ", consumer=" + this); } if (ref == null) { - throw ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName()); + ActiveMQIllegalStateException ils = ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName()); + if (tx != null) { + tx.markAsRollbackOnly(ils); + } + throw ils; } ackReference(tx, ref); @@ -719,7 +725,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { MessageReference ref = removeReferenceByID(messageID); if (ref == null) { - throw new IllegalStateException("Cannot find ref to ack " + messageID); + ActiveMQIllegalStateException ils = ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName()); + if (tx != null) { + tx.markAsRollbackOnly(ils); + } + throw ils; } ackReference(tx, ref); @@ -752,23 +762,29 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { // Expiries can come in out of sequence with respect to delivery order - Iterator iter = deliveringRefs.iterator(); - - MessageReference ref = null; - - while (iter.hasNext()) { - MessageReference theRef = iter.next(); - - if (theRef.getMessage().getMessageID() == messageID) { - iter.remove(); - - ref = theRef; - - break; + synchronized (lock) { + // This is an optimization, if the reference is the first one, we just poll it. + if (deliveringRefs.peek().getMessage().getMessageID() == messageID) { + return deliveringRefs.poll(); } - } - return ref; + Iterator iter = deliveringRefs.iterator(); + + MessageReference ref = null; + + while (iter.hasNext()) { + MessageReference theRef = iter.next(); + + if (theRef.getMessage().getMessageID() == messageID) { + iter.remove(); + + ref = theRef; + + break; + } + } + return ref; + } } public void readyForWriting(final boolean ready) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 3cd1d7b35c..16ec608b48 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; @@ -668,18 +669,22 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } public void acknowledge(final long consumerID, final long messageID) throws Exception { - ServerConsumer consumer = consumers.get(consumerID); - - if (consumer == null) { - throw ActiveMQMessageBundle.BUNDLE.consumerDoesntExist(consumerID); - } + ServerConsumer consumer = findConsumer(consumerID); if (tx != null && tx.getState() == State.ROLLEDBACK) { // JBPAPP-8845 - if we let stuff to be acked on a rolled back TX, we will just // have these messages to be stuck on the limbo until the server is restarted // The tx has already timed out, so we need to ack and rollback immediately Transaction newTX = newTransaction(); - consumer.acknowledge(newTX, messageID); + try { + consumer.acknowledge(newTX, messageID); + } + catch (Exception e) { + // just ignored + // will log it just in case + ActiveMQServerLogger.LOGGER.debug("Ignored exception while acking messageID " + messageID + + " on a rolledback TX", e); + } newTX.rollback(); } else { @@ -687,9 +692,25 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } } - public void individualAcknowledge(final long consumerID, final long messageID) throws Exception { + private ServerConsumer findConsumer(long consumerID) throws Exception { ServerConsumer consumer = consumers.get(consumerID); + if (consumer == null) { + Transaction currentTX = tx; + ActiveMQIllegalStateException exception = ActiveMQMessageBundle.BUNDLE.consumerDoesntExist(consumerID); + + if (currentTX != null) { + currentTX.markAsRollbackOnly(exception); + } + + throw exception; + } + return consumer; + } + + public void individualAcknowledge(final long consumerID, final long messageID) throws Exception { + ServerConsumer consumer = findConsumer(consumerID); + if (tx != null && tx.getState() == State.ROLLEDBACK) { // JBPAPP-8845 - if we let stuff to be acked on a rolled back TX, we will just // have these messages to be stuck on the limbo until the server is restarted diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java index 739578ef2e..d500f6b572 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java @@ -14,12 +14,20 @@ package org.apache.activemq.artemis.tests.extras.jms.ra; import javax.jms.Message; import javax.resource.ResourceException; +import javax.resource.spi.LocalTransactionException; import javax.resource.spi.UnavailableException; import javax.resource.spi.endpoint.MessageEndpoint; import javax.resource.spi.endpoint.MessageEndpointFactory; +import javax.transaction.HeuristicMixedException; +import javax.transaction.HeuristicRollbackException; +import javax.transaction.RollbackException; +import javax.transaction.Status; +import javax.transaction.SystemException; import javax.transaction.Transaction; import javax.transaction.TransactionManager; import javax.transaction.xa.XAResource; +import java.io.PrintWriter; +import java.io.StringWriter; import java.lang.reflect.Method; import java.util.LinkedList; import java.util.List; @@ -37,12 +45,13 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter; +import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation; import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.tests.extras.jms.bridge.TransactionManagerLocatorImpl; import org.apache.activemq.artemis.tests.integration.ra.ActiveMQRATestBase; import org.apache.activemq.artemis.tests.util.RandomUtil; import org.junit.After; @@ -61,6 +70,10 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase ServerLocator nettyLocator; + private volatile boolean playTXTimeouts = true; + private volatile boolean playServerClosingSession = true; + private volatile boolean playServerClosingConsumer = true; + @Before public void setUp() throws Exception { nettyLocator = createNettyNonHALocator(); @@ -91,7 +104,7 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase @Test public void testReconnectMDBNoMessageLoss() throws Exception { AddressSettings settings = new AddressSettings(); - settings.setRedeliveryDelay(1000); + settings.setRedeliveryDelay(100); settings.setMaxDeliveryAttempts(-1); server.getAddressSettingsRepository().clear(); server.getAddressSettingsRepository().addMatch("#", settings); @@ -125,8 +138,9 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase qResourceAdapter.endpointActivation(endpointFactory, spec); Assert.assertEquals(1, resourceAdapter.getActivations().values().size()); + ActiveMQActivation activation = resourceAdapter.getActivations().values().toArray(new ActiveMQActivation[1])[0]; - final int NUMBER_OF_MESSAGES = 3000; + final int NUMBER_OF_MESSAGES = 1000; Thread producer = new Thread() { public void run() { @@ -178,18 +192,12 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase return; } - List serverSessions = new LinkedList<>(); - - for (ServerSession session : server.getSessions()) { - if (session.getMetaData("resource-adapter") != null) { - serverSessions.add(session); - } - } + List serverSessions = lookupServerSessions("resource-adapter"); System.err.println("Contains " + serverSessions.size() + " RA sessions"); if (serverSessions.size() != NUMBER_OF_SESSIONS) { - System.err.println("the server was supposed to have " + NUMBER_OF_SESSIONS + " RA Sessions but it only contained accordingly to the meta-data"); + System.err.println("the server was supposed to have " + NUMBER_OF_MESSAGES + " RA Sessions but it only contained accordingly to the meta-data"); metaDataFailed.set(true); } else if (serverSessions.size() == NUMBER_OF_SESSIONS) { @@ -197,12 +205,29 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase metaDataFailed.set(false); } - if (serverSessions.size() > 0) { + if (playServerClosingSession && serverSessions.size() > 0) { int randomBother = RandomUtil.randomInterval(0, serverSessions.size() - 1); System.out.println("bugging session " + randomBother); - RemotingConnection connection = serverSessions.get(randomBother).getRemotingConnection(); + ServerSession serverSession = serverSessions.get(randomBother); + + if (playServerClosingConsumer && RandomUtil.randomBoolean()) { + // will play this randomly, only half of the times + for (ServerConsumer consumer : serverSession.getServerConsumers()) { + try { + // Simulating a rare race that could happen in production + // where the consumer is closed while things are still happening + consumer.close(true); + Thread.sleep(100); + } + catch (Exception e) { + e.printStackTrace(); + } + } + } + + RemotingConnection connection = serverSession.getRemotingConnection(); connection.fail(new ActiveMQException("failed at random " + randomBother)); } @@ -221,11 +246,21 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase ClientConsumer consumer = session.createConsumer("jms.queue.outQueue"); for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { - ClientMessage message = consumer.receive(5000); + ClientMessage message = consumer.receive(60000); if (message == null) { break; } + if (i == NUMBER_OF_MESSAGES * 0.90) { + System.out.println("Disabled failures at " + i); + playTXTimeouts = false; + playServerClosingSession = false; + playServerClosingConsumer = false; + + } + + System.out.println("Received " + i + " messages"); + Assert.assertNotNull(message); message.acknowledge(); @@ -247,16 +282,19 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase session.commit(); Assert.assertNull(consumer.receiveImmediate()); + StringWriter writer = new StringWriter(); + PrintWriter out = new PrintWriter(writer); + boolean failed = false; for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { AtomicInteger atomicInteger = mapCounter.get(Integer.valueOf(i)); if (atomicInteger == null) { - System.out.println("didn't receive message with i=" + i); + out.println("didn't receive message with i=" + i); failed = true; } else if (atomicInteger.get() > 1) { - System.out.println("message with i=" + i + " received " + atomicInteger.get() + " times"); + out.println("message with i=" + i + " received " + atomicInteger.get() + " times"); failed = true; } } @@ -266,15 +304,34 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase buggerThread.join(); producer.join(); - Assert.assertFalse("There was meta-data failures, some sessions didn't reconnect properly", metaDataFailed.get()); + qResourceAdapter.stop(); + + session.close(); + + if (failed) { + for (int i = 0; i < 10; i++) { + System.out.println("----------------------------------------------------"); + } + System.out.println(writer.toString()); + } Assert.assertFalse(failed); System.out.println("Received " + NUMBER_OF_MESSAGES + " messages"); - qResourceAdapter.stop(); + Assert.assertFalse("There was meta-data failures, some sessions didn't reconnect properly", metaDataFailed.get()); - session.close(); + } + + private List lookupServerSessions(String parameter) { + List serverSessions = new LinkedList(); + + for (ServerSession session : server.getSessions()) { + if (session.getMetaData(parameter) != null) { + serverSessions.add(session); + } + } + return serverSessions; } protected class TestEndpointFactory implements MessageEndpointFactory { @@ -330,17 +387,10 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase catch (Throwable e) { throw new RuntimeException(e.getMessage(), e); } + } public void onMessage(Message message) { - // try - // { - // System.out.println(Thread.currentThread().getName() + "**** onMessage enter " + message.getIntProperty("i")); - // } - // catch (Exception e) - // { - // } - Integer value = 0; try { @@ -355,9 +405,15 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase try { currentTX.enlistResource(endpointSession); ClientMessage message1 = endpointSession.createMessage(true); - message1.putIntProperty("i", message.getIntProperty("i")); + message1.putIntProperty("i", value); producer.send(message1); currentTX.delistResource(endpointSession, XAResource.TMSUCCESS); + + if (playTXTimeouts) { + if (RandomUtil.randomInterval(0, 5) == 3) { + Thread.sleep(2000); + } + } } catch (Exception e) { e.printStackTrace(); @@ -373,11 +429,26 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase @Override public void afterDelivery() throws ResourceException { + // This is a copy & paste of what the Application server would do here try { - DummyTMLocator.tm.commit(); - // currentTX.commit(); + if (currentTX.getStatus() == Status.STATUS_MARKED_ROLLBACK) { + DummyTMLocator.tm.rollback(); + } + else { + DummyTMLocator.tm.commit(); + } } - catch (Throwable e) { + catch (HeuristicMixedException e) { + throw new LocalTransactionException(e); + } + catch (SystemException e) { + throw new LocalTransactionException(e); + } + catch (HeuristicRollbackException e) { + throw new LocalTransactionException(e); + } + catch (RollbackException e) { + throw new LocalTransactionException(e); } super.afterDelivery(); } @@ -389,7 +460,6 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase public static void stopTM() { try { - TransactionManagerLocatorImpl.setTransactionManager(null); TransactionReaper.terminate(true); TxControl.disable(true); } @@ -401,7 +471,6 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase public static void startTM() { tm = new TransactionManagerImple(); - TransactionManagerLocatorImpl.setTransactionManager(tm); TxControl.enable(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java index 9801a83da8..42d5211a09 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java @@ -1170,7 +1170,7 @@ public class FailoverTest extends FailoverTestBase { crash(session); try { - session.commit(xid, true); + session.commit(xid, false); Assert.fail("Should throw exception"); } @@ -1374,7 +1374,7 @@ public class FailoverTest extends FailoverTestBase { crash(session2); try { - session2.commit(xid, true); + session2.commit(xid, false); Assert.fail("Should throw exception"); }