merge #242 - ARTEMIS-302 - more work about improving resilience of MDBs and XA

This commit is contained in:
Andy Taylor 2015-11-17 15:08:46 +00:00
commit 5b824e1bcf
9 changed files with 211 additions and 79 deletions

View File

@ -532,6 +532,10 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
rollbackOnly = false;
}
public void markRollbackOnly() {
rollbackOnly = true;
}
public ClientMessage createMessage(final byte type,
final boolean durable,
final long expiration,
@ -1036,7 +1040,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
// we should never throw rollback if we have already prepared
if (rollbackOnly) {
ActiveMQClientLogger.LOGGER.commitAfterFailover();
if (onePhase) {
throw new XAException(XAException.XAER_RMFAIL);
}
else {
ActiveMQClientLogger.LOGGER.commitAfterFailover();
}
}
// Note - don't need to flush acks since the previous end would have

View File

@ -93,6 +93,8 @@ public interface ClientSessionInternal extends ClientSession {
void resetIfNeeded() throws ActiveMQException;
void markRollbackOnly();
/**
* This is used internally to control and educate the user
* about using the thread boundaries properly.

View File

@ -141,6 +141,10 @@ public class DelegatingSession implements ClientSessionInternal {
session.close();
}
public void markRollbackOnly() {
session.markRollbackOnly();
}
public void commit() throws ActiveMQException {
session.commit();
}

View File

@ -38,9 +38,9 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.ra.ActiveMQRALogger;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.ra.ActiveMQRALogger;
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
import org.apache.activemq.artemis.service.extensions.ServiceUtils;
import org.apache.activemq.artemis.service.extensions.xa.ActiveMQXAResourceWrapper;
@ -292,6 +292,11 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
if (activation.getActivationSpec().getTransactionTimeout() > 0 && tm != null) {
tm.setTransactionTimeout(activation.getActivationSpec().getTransactionTimeout());
}
if (trace) {
ActiveMQRALogger.LOGGER.trace("HornetQMessageHandler::calling beforeDelivery on message " + message);
}
endpoint.beforeDelivery(ActiveMQActivation.ONMESSAGE);
beforeDelivery = true;
msg.doBeforeReceive();
@ -299,13 +304,17 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
//In the transacted case the message must be acked *before* onMessage is called
if (transacted) {
message.acknowledge();
message.individualAcknowledge();
}
((MessageListener) endpoint).onMessage(msg);
if (!transacted) {
message.acknowledge();
message.individualAcknowledge();
}
if (trace) {
ActiveMQRALogger.LOGGER.trace("HornetQMessageHandler::calling afterDelivery on message " + message);
}
try {
@ -313,6 +322,10 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
}
catch (ResourceException e) {
ActiveMQRALogger.LOGGER.unableToCallAfterDelivery(e);
// If we get here, The TX was already rolled back
// However we must do some stuff now to make sure the client message buffer is cleared
// so we mark this as rollbackonly
session.markRollbackOnly();
return;
}
if (useLocalTx) {
@ -340,13 +353,6 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
}
catch (Exception e1) {
ActiveMQRALogger.LOGGER.warn("unnable to clear the transaction", e1);
try {
session.rollback();
}
catch (ActiveMQException e2) {
ActiveMQRALogger.LOGGER.warn("Unable to rollback", e2);
return;
}
}
}
@ -369,6 +375,10 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
ActiveMQRALogger.LOGGER.unableToRollbackTX();
}
}
// This is to make sure we will issue a rollback after failures
// so that would cleanup consumer buffers among other things
session.markRollbackOnly();
}
finally {
try {

View File

@ -45,8 +45,8 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
@ -58,10 +58,10 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
@ -1548,7 +1548,7 @@ public class QueueImpl implements Queue {
Binding binding = postOffice.getBinding(originalMessageQueue);
if (binding != null && binding instanceof LocalQueueBinding) {
targetQueue = ((LocalQueueBinding)binding).getID();
targetQueue = ((LocalQueueBinding) binding).getID();
queues.put(originalMessageQueue, targetQueue);
}
}
@ -1562,12 +1562,10 @@ public class QueueImpl implements Queue {
}
}
}
});
}
public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception {
@ -2025,6 +2023,9 @@ public class QueueImpl implements Queue {
private void internalAddRedistributor(final Executor executor) {
// create the redistributor only once if there are no local consumers
if (consumerSet.isEmpty() && redistributor == null) {
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("QueueImpl::Adding redistributor on queue " + this.toString());
}
redistributor = new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE);
consumerList.add(new ConsumerHolder(redistributor));
@ -2103,7 +2104,7 @@ public class QueueImpl implements Queue {
final MessageReference ref,
final boolean expiry,
final boolean rejectDuplicate,
final long ... queueIDs) throws Exception {
final long... queueIDs) throws Exception {
ServerMessage copyMessage = makeCopy(ref, expiry);
copyMessage.setAddress(toAddress);

View File

@ -380,7 +380,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override
public void close(final boolean failed) throws Exception {
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace"));
ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace"));
}
callback.removeReadyListener(this);
@ -405,7 +405,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
MessageReference ref = iter.next();
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " cancelling reference " + ref);
ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " cancelling reference " + ref);
}
ref.getQueue().cancel(tx, ref, true);
@ -662,14 +662,20 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
MessageReference ref;
do {
ref = deliveringRefs.poll();
synchronized (lock) {
ref = deliveringRefs.poll();
}
if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
ActiveMQServerLogger.LOGGER.trace("ACKing ref " + ref + " on tx= " + tx + ", consumer=" + this);
}
if (ref == null) {
throw ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName());
ActiveMQIllegalStateException ils = ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName());
if (tx != null) {
tx.markAsRollbackOnly(ils);
}
throw ils;
}
ackReference(tx, ref);
@ -719,7 +725,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
MessageReference ref = removeReferenceByID(messageID);
if (ref == null) {
throw new IllegalStateException("Cannot find ref to ack " + messageID);
ActiveMQIllegalStateException ils = ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName());
if (tx != null) {
tx.markAsRollbackOnly(ils);
}
throw ils;
}
ackReference(tx, ref);
@ -752,23 +762,29 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// Expiries can come in out of sequence with respect to delivery order
Iterator<MessageReference> iter = deliveringRefs.iterator();
MessageReference ref = null;
while (iter.hasNext()) {
MessageReference theRef = iter.next();
if (theRef.getMessage().getMessageID() == messageID) {
iter.remove();
ref = theRef;
break;
synchronized (lock) {
// This is an optimization, if the reference is the first one, we just poll it.
if (deliveringRefs.peek().getMessage().getMessageID() == messageID) {
return deliveringRefs.poll();
}
}
return ref;
Iterator<MessageReference> iter = deliveringRefs.iterator();
MessageReference ref = null;
while (iter.hasNext()) {
MessageReference theRef = iter.next();
if (theRef.getMessage().getMessageID() == messageID) {
iter.remove();
ref = theRef;
break;
}
}
return ref;
}
}
public void readyForWriting(final boolean ready) {

View File

@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
@ -668,18 +669,22 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
public void acknowledge(final long consumerID, final long messageID) throws Exception {
ServerConsumer consumer = consumers.get(consumerID);
if (consumer == null) {
throw ActiveMQMessageBundle.BUNDLE.consumerDoesntExist(consumerID);
}
ServerConsumer consumer = findConsumer(consumerID);
if (tx != null && tx.getState() == State.ROLLEDBACK) {
// JBPAPP-8845 - if we let stuff to be acked on a rolled back TX, we will just
// have these messages to be stuck on the limbo until the server is restarted
// The tx has already timed out, so we need to ack and rollback immediately
Transaction newTX = newTransaction();
consumer.acknowledge(newTX, messageID);
try {
consumer.acknowledge(newTX, messageID);
}
catch (Exception e) {
// just ignored
// will log it just in case
ActiveMQServerLogger.LOGGER.debug("Ignored exception while acking messageID " + messageID +
" on a rolledback TX", e);
}
newTX.rollback();
}
else {
@ -687,9 +692,25 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
}
public void individualAcknowledge(final long consumerID, final long messageID) throws Exception {
private ServerConsumer findConsumer(long consumerID) throws Exception {
ServerConsumer consumer = consumers.get(consumerID);
if (consumer == null) {
Transaction currentTX = tx;
ActiveMQIllegalStateException exception = ActiveMQMessageBundle.BUNDLE.consumerDoesntExist(consumerID);
if (currentTX != null) {
currentTX.markAsRollbackOnly(exception);
}
throw exception;
}
return consumer;
}
public void individualAcknowledge(final long consumerID, final long messageID) throws Exception {
ServerConsumer consumer = findConsumer(consumerID);
if (tx != null && tx.getState() == State.ROLLEDBACK) {
// JBPAPP-8845 - if we let stuff to be acked on a rolled back TX, we will just
// have these messages to be stuck on the limbo until the server is restarted

View File

@ -14,12 +14,20 @@ package org.apache.activemq.artemis.tests.extras.jms.ra;
import javax.jms.Message;
import javax.resource.ResourceException;
import javax.resource.spi.LocalTransactionException;
import javax.resource.spi.UnavailableException;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.Method;
import java.util.LinkedList;
import java.util.List;
@ -37,12 +45,13 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation;
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.extras.jms.bridge.TransactionManagerLocatorImpl;
import org.apache.activemq.artemis.tests.integration.ra.ActiveMQRATestBase;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.junit.After;
@ -61,6 +70,10 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
ServerLocator nettyLocator;
private volatile boolean playTXTimeouts = true;
private volatile boolean playServerClosingSession = true;
private volatile boolean playServerClosingConsumer = true;
@Before
public void setUp() throws Exception {
nettyLocator = createNettyNonHALocator();
@ -91,7 +104,7 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
@Test
public void testReconnectMDBNoMessageLoss() throws Exception {
AddressSettings settings = new AddressSettings();
settings.setRedeliveryDelay(1000);
settings.setRedeliveryDelay(100);
settings.setMaxDeliveryAttempts(-1);
server.getAddressSettingsRepository().clear();
server.getAddressSettingsRepository().addMatch("#", settings);
@ -125,8 +138,9 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
qResourceAdapter.endpointActivation(endpointFactory, spec);
Assert.assertEquals(1, resourceAdapter.getActivations().values().size());
ActiveMQActivation activation = resourceAdapter.getActivations().values().toArray(new ActiveMQActivation[1])[0];
final int NUMBER_OF_MESSAGES = 3000;
final int NUMBER_OF_MESSAGES = 1000;
Thread producer = new Thread() {
public void run() {
@ -178,18 +192,12 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
return;
}
List<ServerSession> serverSessions = new LinkedList<>();
for (ServerSession session : server.getSessions()) {
if (session.getMetaData("resource-adapter") != null) {
serverSessions.add(session);
}
}
List<ServerSession> serverSessions = lookupServerSessions("resource-adapter");
System.err.println("Contains " + serverSessions.size() + " RA sessions");
if (serverSessions.size() != NUMBER_OF_SESSIONS) {
System.err.println("the server was supposed to have " + NUMBER_OF_SESSIONS + " RA Sessions but it only contained accordingly to the meta-data");
System.err.println("the server was supposed to have " + NUMBER_OF_MESSAGES + " RA Sessions but it only contained accordingly to the meta-data");
metaDataFailed.set(true);
}
else if (serverSessions.size() == NUMBER_OF_SESSIONS) {
@ -197,12 +205,29 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
metaDataFailed.set(false);
}
if (serverSessions.size() > 0) {
if (playServerClosingSession && serverSessions.size() > 0) {
int randomBother = RandomUtil.randomInterval(0, serverSessions.size() - 1);
System.out.println("bugging session " + randomBother);
RemotingConnection connection = serverSessions.get(randomBother).getRemotingConnection();
ServerSession serverSession = serverSessions.get(randomBother);
if (playServerClosingConsumer && RandomUtil.randomBoolean()) {
// will play this randomly, only half of the times
for (ServerConsumer consumer : serverSession.getServerConsumers()) {
try {
// Simulating a rare race that could happen in production
// where the consumer is closed while things are still happening
consumer.close(true);
Thread.sleep(100);
}
catch (Exception e) {
e.printStackTrace();
}
}
}
RemotingConnection connection = serverSession.getRemotingConnection();
connection.fail(new ActiveMQException("failed at random " + randomBother));
}
@ -221,11 +246,21 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
ClientConsumer consumer = session.createConsumer("jms.queue.outQueue");
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
ClientMessage message = consumer.receive(5000);
ClientMessage message = consumer.receive(60000);
if (message == null) {
break;
}
if (i == NUMBER_OF_MESSAGES * 0.90) {
System.out.println("Disabled failures at " + i);
playTXTimeouts = false;
playServerClosingSession = false;
playServerClosingConsumer = false;
}
System.out.println("Received " + i + " messages");
Assert.assertNotNull(message);
message.acknowledge();
@ -247,16 +282,19 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
session.commit();
Assert.assertNull(consumer.receiveImmediate());
StringWriter writer = new StringWriter();
PrintWriter out = new PrintWriter(writer);
boolean failed = false;
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
AtomicInteger atomicInteger = mapCounter.get(Integer.valueOf(i));
if (atomicInteger == null) {
System.out.println("didn't receive message with i=" + i);
out.println("didn't receive message with i=" + i);
failed = true;
}
else if (atomicInteger.get() > 1) {
System.out.println("message with i=" + i + " received " + atomicInteger.get() + " times");
out.println("message with i=" + i + " received " + atomicInteger.get() + " times");
failed = true;
}
}
@ -266,15 +304,34 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
buggerThread.join();
producer.join();
Assert.assertFalse("There was meta-data failures, some sessions didn't reconnect properly", metaDataFailed.get());
qResourceAdapter.stop();
session.close();
if (failed) {
for (int i = 0; i < 10; i++) {
System.out.println("----------------------------------------------------");
}
System.out.println(writer.toString());
}
Assert.assertFalse(failed);
System.out.println("Received " + NUMBER_OF_MESSAGES + " messages");
qResourceAdapter.stop();
Assert.assertFalse("There was meta-data failures, some sessions didn't reconnect properly", metaDataFailed.get());
session.close();
}
private List<ServerSession> lookupServerSessions(String parameter) {
List<ServerSession> serverSessions = new LinkedList<ServerSession>();
for (ServerSession session : server.getSessions()) {
if (session.getMetaData(parameter) != null) {
serverSessions.add(session);
}
}
return serverSessions;
}
protected class TestEndpointFactory implements MessageEndpointFactory {
@ -330,17 +387,10 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
catch (Throwable e) {
throw new RuntimeException(e.getMessage(), e);
}
}
public void onMessage(Message message) {
// try
// {
// System.out.println(Thread.currentThread().getName() + "**** onMessage enter " + message.getIntProperty("i"));
// }
// catch (Exception e)
// {
// }
Integer value = 0;
try {
@ -355,9 +405,15 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
try {
currentTX.enlistResource(endpointSession);
ClientMessage message1 = endpointSession.createMessage(true);
message1.putIntProperty("i", message.getIntProperty("i"));
message1.putIntProperty("i", value);
producer.send(message1);
currentTX.delistResource(endpointSession, XAResource.TMSUCCESS);
if (playTXTimeouts) {
if (RandomUtil.randomInterval(0, 5) == 3) {
Thread.sleep(2000);
}
}
}
catch (Exception e) {
e.printStackTrace();
@ -373,11 +429,26 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
@Override
public void afterDelivery() throws ResourceException {
// This is a copy & paste of what the Application server would do here
try {
DummyTMLocator.tm.commit();
// currentTX.commit();
if (currentTX.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
DummyTMLocator.tm.rollback();
}
else {
DummyTMLocator.tm.commit();
}
}
catch (Throwable e) {
catch (HeuristicMixedException e) {
throw new LocalTransactionException(e);
}
catch (SystemException e) {
throw new LocalTransactionException(e);
}
catch (HeuristicRollbackException e) {
throw new LocalTransactionException(e);
}
catch (RollbackException e) {
throw new LocalTransactionException(e);
}
super.afterDelivery();
}
@ -389,7 +460,6 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
public static void stopTM() {
try {
TransactionManagerLocatorImpl.setTransactionManager(null);
TransactionReaper.terminate(true);
TxControl.disable(true);
}
@ -401,7 +471,6 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
public static void startTM() {
tm = new TransactionManagerImple();
TransactionManagerLocatorImpl.setTransactionManager(tm);
TxControl.enable();
}

View File

@ -1170,7 +1170,7 @@ public class FailoverTest extends FailoverTestBase {
crash(session);
try {
session.commit(xid, true);
session.commit(xid, false);
Assert.fail("Should throw exception");
}
@ -1374,7 +1374,7 @@ public class FailoverTest extends FailoverTestBase {
crash(session2);
try {
session2.commit(xid, true);
session2.commit(xid, false);
Assert.fail("Should throw exception");
}