further evolution of resolution for https://issues.apache.org/activemq/browse/AMQ-2590 - indoubt transactions are now rolledback, pending transactions can wait for jms.consumerFailoverRedeliveryWaitPeriod for redeliveries before rolling back. If previously delivered messages are not replayed the transaction is rolledback

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@906450 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-02-04 11:06:13 +00:00
parent 3253a421c1
commit b836af84f0
9 changed files with 281 additions and 162 deletions

View File

@ -187,6 +187,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private final Object ensureConnectionInfoSentMutex = new Object(); private final Object ensureConnectionInfoSentMutex = new Object();
private boolean useDedicatedTaskRunner; private boolean useDedicatedTaskRunner;
protected CountDownLatch transportInterruptionProcessingComplete; protected CountDownLatch transportInterruptionProcessingComplete;
private long consumerFailoverRedeliveryWaitPeriod;
/** /**
* Construct an <code>ActiveMQConnection</code> * Construct an <code>ActiveMQConnection</code>
@ -2244,7 +2245,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException { protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
if (transportInterruptionProcessingComplete != null) { if (transportInterruptionProcessingComplete != null) {
while (!closed.get() && !transportFailed.get() && !transportInterruptionProcessingComplete.await(15, TimeUnit.SECONDS)) { while (!closed.get() && !transportFailed.get() && !transportInterruptionProcessingComplete.await(10, TimeUnit.SECONDS)) {
LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + transportInterruptionProcessingComplete.getCount() + ") to complete.."); LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + transportInterruptionProcessingComplete.getCount() + ") to complete..");
} }
signalInterruptionProcessingComplete(); signalInterruptionProcessingComplete();
@ -2262,16 +2263,32 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private void signalInterruptionProcessingComplete() throws InterruptedException { private void signalInterruptionProcessingComplete() throws InterruptedException {
if (transportInterruptionProcessingComplete.await(0, TimeUnit.SECONDS)) { if (transportInterruptionProcessingComplete.await(0, TimeUnit.SECONDS)) {
if (LOG.isDebugEnabled()) {
LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
}
synchronized (this) { synchronized (this) {
transportInterruptionProcessingComplete = null; transportInterruptionProcessingComplete = null;
FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class); FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
if (failoverTransport != null) { if (failoverTransport != null) {
failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId()); failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId()); LOG.debug("notified failover transport (" + failoverTransport +") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
} }
} }
} }
} }
} }
/*
* specify the amount of time in milliseconds that a consumer with a transaction pending recovery
* will wait to receive re dispatched messages.
* default value is 0 so there is no wait by default.
*/
public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
}
public long getConsumerFailoverRedeliveryWaitPeriod() {
return consumerFailoverRedeliveryWaitPeriod;
}
} }

View File

@ -114,6 +114,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE; private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT; private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
private boolean useDedicatedTaskRunner; private boolean useDedicatedTaskRunner;
private long consumerFailoverRedeliveryWaitPeriod = 0;
// ///////////////////////////////////////////// // /////////////////////////////////////////////
// //
@ -315,6 +316,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
connection.setAuditDepth(getAuditDepth()); connection.setAuditDepth(getAuditDepth());
connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber()); connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner()); connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
if (transportListener != null) { if (transportListener != null) {
connection.addTransportListener(transportListener); connection.addTransportListener(transportListener);
} }
@ -913,4 +915,12 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
public boolean isUseDedicatedTaskRunner() { public boolean isUseDedicatedTaskRunner() {
return useDedicatedTaskRunner; return useDedicatedTaskRunner;
} }
public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
}
public long getConsumerFailoverRedeliveryWaitPeriod() {
return consumerFailoverRedeliveryWaitPeriod;
}
} }

View File

@ -112,6 +112,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
// not been acknowledged. It's kept in reverse order since we // not been acknowledged. It's kept in reverse order since we
// Always walk list in reverse order. // Always walk list in reverse order.
private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>(); private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
// track duplicate deliveries in a transaction such that the tx integrity can be validated
private HashMap<MessageId, Boolean> previouslyDeliveredMessages; private HashMap<MessageId, Boolean> previouslyDeliveredMessages;
private int deliveredCounter; private int deliveredCounter;
private int additionalWindowSize; private int additionalWindowSize;
@ -141,6 +142,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private long optimizeAckTimestamp = System.currentTimeMillis(); private long optimizeAckTimestamp = System.currentTimeMillis();
private long optimizeAckTimeout = 300; private long optimizeAckTimeout = 300;
private long failoverRedeliveryWaitPeriod = 0;
private boolean rollbackInitiated;
/** /**
* Create a MessageConsumer * Create a MessageConsumer
@ -228,7 +231,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge() this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
&& !info.isBrowser(); && !info.isBrowser();
this.info.setOptimizedAcknowledge(this.optimizeAcknowledge); this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
if (messageListener != null) { if (messageListener != null) {
setMessageListener(messageListener); setMessageListener(messageListener);
} }
@ -948,6 +951,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
*/ */
public void acknowledge() throws JMSException { public void acknowledge() throws JMSException {
clearDispatchList(); clearDispatchList();
waitForRedeliveries();
synchronized(deliveredMessages) { synchronized(deliveredMessages) {
// Acknowledge all messages so far. // Acknowledge all messages so far.
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
@ -972,51 +976,67 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
} }
private void waitForRedeliveries() {
if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) {
long expiry = System.currentTimeMillis() + failoverRedeliveryWaitPeriod;
int numberNotReplayed;
do {
numberNotReplayed = 0;
synchronized(deliveredMessages) {
if (previouslyDeliveredMessages != null) {
for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
if (!entry.getValue()) {
numberNotReplayed++;
}
}
}
}
if (numberNotReplayed > 0) {
LOG.info("waiting for redelivery of " + numberNotReplayed + " to consumer :" + this.getConsumerId());
try {
Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4));
} catch (InterruptedException outOfhere) {
break;
}
}
} while (numberNotReplayed > 0 && expiry < System.currentTimeMillis());
}
}
/* /*
* called with deliveredMessages locked * called with deliveredMessages locked
*/ */
private void rollbackOnFailedRecoveryRedelivery() throws JMSException { private void rollbackOnFailedRecoveryRedelivery() throws JMSException {
if (previouslyDeliveredMessages != null) { if (previouslyDeliveredMessages != null) {
if (rollbackInitiated) {
// second call from rollback, nothing more to do
// REVISIT - should beforeEnd be called again by transaction context?
rollbackInitiated = false;
return;
}
// if any previously delivered messages was not re-delivered, transaction is invalid and must rollback // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback
// as messages have been dispatched else where. // as messages have been dispatched else where.
int numberNotReplayed = 0; int numberNotReplayed = 0;
for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) { for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
if (!entry.getValue()) { if (!entry.getValue()) {
numberNotReplayed++; numberNotReplayed++;
// allow outstanding messages to get delivered again
removeFromDeliveredMessages(entry.getKey());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("previously delivered message has not been replayed in transaction, id: " + entry.getKey()); LOG.debug("previously delivered message has not been replayed in transaction, id: " + entry.getKey());
} }
} }
} }
clearPreviouslyDelivered();
if (numberNotReplayed > 0) { if (numberNotReplayed > 0) {
String message = "rolling back transaction post failover recovery. " + numberNotReplayed String message = "rolling back transaction post failover recovery. " + numberNotReplayed
+ " previously delivered message(s) not replayed to consumer: " + this.getConsumerId(); + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId();
LOG.warn(message); LOG.warn(message);
rollbackInitiated = true;
throw new TransactionRolledBackException(message); throw new TransactionRolledBackException(message);
} }
} }
} }
/*
* called with deliveredMessages locked
*/
private void removeFromDeliveredMessages(MessageId key) {
ListIterator<MessageDispatch> iterator = deliveredMessages.listIterator(deliveredMessages.size());
while (iterator.hasPrevious()) {
MessageDispatch candidate = iterator.previous();
if (key.equals(candidate.getMessage().getMessageId())) {
session.connection.rollbackDuplicate(this, candidate.getMessage());
iterator.remove();
break;
}
}
}
void acknowledge(MessageDispatch md) throws JMSException { void acknowledge(MessageDispatch md) throws JMSException {
MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1); MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
session.sendAck(ack); session.sendAck(ack);
@ -1049,7 +1069,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
} }
synchronized(deliveredMessages) { synchronized(deliveredMessages) {
clearPreviouslyDelivered(); rollbackPreviouslyDeliveredAndNotRedelivered();
if (deliveredMessages.isEmpty()) { if (deliveredMessages.isEmpty()) {
return; return;
} }
@ -1125,6 +1145,37 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
} }
/*
* called with unconsumedMessages && deliveredMessages locked
* remove any message not re-delivered as they can't be replayed to this
* consumer on rollback
*/
private void rollbackPreviouslyDeliveredAndNotRedelivered() {
if (previouslyDeliveredMessages != null) {
for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
if (!entry.getValue()) {
removeFromDeliveredMessages(entry.getKey());
}
}
rollbackInitiated = false;
clearPreviouslyDelivered();
}
}
/*
* called with deliveredMessages locked
*/
private void removeFromDeliveredMessages(MessageId key) {
ListIterator<MessageDispatch> iterator = deliveredMessages.listIterator(deliveredMessages.size());
while (iterator.hasPrevious()) {
MessageDispatch candidate = iterator.previous();
if (key.equals(candidate.getMessage().getMessageId())) {
session.connection.rollbackDuplicate(this, candidate.getMessage());
iterator.remove();
break;
}
}
}
/* /*
* called with deliveredMessages locked * called with deliveredMessages locked
*/ */
@ -1170,7 +1221,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} else { } else {
if (!session.isTransacted()) { if (!session.isTransacted()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " ignoring(auto acking) duplicate: " + md.getMessage()); LOG.debug(getConsumerId() + " ignoring (auto acking) duplicate: " + md.getMessage());
} }
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
session.sendAck(ack); session.sendAck(ack);
@ -1178,12 +1229,24 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " tracking transacted redlivery of duplicate: " + md.getMessage()); LOG.debug(getConsumerId() + " tracking transacted redlivery of duplicate: " + md.getMessage());
} }
boolean needsPoisonAck = false;
synchronized (deliveredMessages) { synchronized (deliveredMessages) {
if (previouslyDeliveredMessages != null) { if (previouslyDeliveredMessages != null) {
previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true); previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
} else {
// existing transaction gone but still a duplicate!, lets mark as poison ftm,
// possibly could allow redelivery..
needsPoisonAck = true;
} }
} }
ackLater(md, MessageAck.DELIVERED_ACK_TYPE); if (needsPoisonAck) {
LOG.warn("acking as poison, duplicate transacted delivery but no recovering transaction for: " + md);
MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
poisonAck.setFirstMessageId(md.getMessage().getMessageId());
session.sendAck(poisonAck);
} else {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
}
} }
} }
} }
@ -1197,7 +1260,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
} }
// async (on next call) clear delivered as they will be auto-acked as duplicates if they arrive again // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again
private void clearDispatchList() { private void clearDispatchList() {
if (clearDispatchList) { if (clearDispatchList) {
synchronized (deliveredMessages) { synchronized (deliveredMessages) {
@ -1205,7 +1268,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (!deliveredMessages.isEmpty()) { if (!deliveredMessages.isEmpty()) {
if (session.isTransacted()) { if (session.isTransacted()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " tracking delivered list (" + deliveredMessages.size() + ") on transport interrupt"); LOG.debug(getConsumerId() + " tracking existing transacted delivered list (" + deliveredMessages.size() + ") on transport interrupt");
} }
if (previouslyDeliveredMessages == null) { if (previouslyDeliveredMessages == null) {
previouslyDeliveredMessages = new HashMap<MessageId, Boolean>(); previouslyDeliveredMessages = new HashMap<MessageId, Boolean>();

View File

@ -293,11 +293,21 @@ public class TransactionContext implements XAResource {
TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE); TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE);
this.transactionId = null; this.transactionId = null;
// Notify the listener that the tx was committed back // Notify the listener that the tx was committed back
syncSendPacketWithInterruptionHandling(info); try {
if (localTransactionEventListener != null) { syncSendPacketWithInterruptionHandling(info);
localTransactionEventListener.commitEvent(); if (localTransactionEventListener != null) {
localTransactionEventListener.commitEvent();
}
afterCommit();
} catch (JMSException cause) {
LOG.info("commit failed for transaction " + info.getTransactionId(), cause);
if (localTransactionEventListener != null) {
localTransactionEventListener.rollbackEvent();
}
afterRollback();
throw cause;
} }
afterCommit();
} }
} }

View File

@ -24,6 +24,8 @@ import java.util.Vector;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import javax.jms.TransactionRolledBackException;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConnectionInfo;
@ -31,6 +33,7 @@ import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerId;
@ -141,22 +144,24 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
} }
private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException { private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
Vector<Command> toIgnore = new Vector<Command>(); Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>();
for (TransactionState transactionState : connectionState.getTransactionStates()) { for (TransactionState transactionState : connectionState.getTransactionStates()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("tx: " + transactionState.getId()); LOG.debug("tx: " + transactionState.getId());
} }
// ignore any empty (ack) transaction // rollback any completed transactions - no way to know if commit got there
if (transactionState.getCommands().size() == 2) { // or if reply went missing
Command lastCommand = transactionState.getCommands().get(1); //
if (!transactionState.getCommands().isEmpty()) {
Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1);
if (lastCommand instanceof TransactionInfo) { if (lastCommand instanceof TransactionInfo) {
TransactionInfo transactionInfo = (TransactionInfo) lastCommand; TransactionInfo transactionInfo = (TransactionInfo) lastCommand;
if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) { if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("not replaying empty (ack) tx: " + transactionState.getId()); LOG.debug("rolling back potentially completed tx: " + transactionState.getId());
} }
toIgnore.add(lastCommand); toRollback.add(transactionInfo);
continue; continue;
} }
} }
@ -185,9 +190,10 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
} }
} }
for (Command command: toIgnore) { for (TransactionInfo command: toRollback) {
// respond to the outstanding commit // respond to the outstanding commit
Response response = new Response(); ExceptionResponse response = new ExceptionResponse();
response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId()));
response.setCorrelationId(command.getCommandId()); response.setCorrelationId(command.getCommandId());
transport.getTransportListener().onCommand(response); transport.getTransportListener().onCommand(response);
} }

View File

@ -221,12 +221,13 @@ public class FailoverTransport implements CompositeTransport {
failedConnectTransportURI=connectedTransportURI; failedConnectTransportURI=connectedTransportURI;
connectedTransportURI = null; connectedTransportURI = null;
connected=false; connected=false;
stateTracker.transportInterrupted();
// notify before any reconnect attempt so ack state can be whacked // notify before any reconnect attempt so ack state can be whacked
if (transportListener != null) { if (transportListener != null) {
transportListener.transportInterupted(); transportListener.transportInterupted();
} }
stateTracker.transportInterrupted();
if (reconnectOk) { if (reconnectOk) {
reconnectTask.wakeup(); reconnectTask.wakeup();

View File

@ -84,7 +84,7 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
connection.start(); connection.start();
MessageProducer producer = session.createProducer(destination); MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg"); TextMessage message = session.createTextMessage("test msg");
long time = System.currentTimeMillis() + 1000; long time = System.currentTimeMillis() + 4000;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_START, time); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_START, time);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 50); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 50);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, NUMBER-1); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, NUMBER-1);

View File

@ -16,13 +16,16 @@
*/ */
package org.apache.activemq.transport.failover; package org.apache.activemq.transport.failover;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
@ -51,6 +54,7 @@ public class FailoverConsumerOutstandingCommitTest {
private static final Log LOG = LogFactory.getLog(FailoverConsumerOutstandingCommitTest.class); private static final Log LOG = LogFactory.getLog(FailoverConsumerOutstandingCommitTest.class);
private static final String QUEUE_NAME = "FailoverWithOutstandingCommit"; private static final String QUEUE_NAME = "FailoverWithOutstandingCommit";
private static final String MESSAGE_TEXT = "Test message ";
private String url = "tcp://localhost:61616"; private String url = "tcp://localhost:61616";
final int prefetch = 10; final int prefetch = 10;
BrokerService broker; BrokerService broker;
@ -126,7 +130,7 @@ public class FailoverConsumerOutstandingCommitTest {
connection.start(); connection.start();
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME + "?jms.consumer.prefetch=" + prefetch); final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=" + prefetch);
final Session consumerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); final Session consumerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
@ -166,8 +170,17 @@ public class FailoverConsumerOutstandingCommitTest {
connection.close(); connection.close();
} }
@Test @Test
public void testFailoverConsumerOutstandingSendTx() throws Exception { public void TestFailoverConsumerOutstandingSendTxIncomplete() throws Exception {
doTestFailoverConsumerOutstandingSendTx(false);
}
@Test
public void TestFailoverConsumerOutstandingSendTxComplete() throws Exception {
doTestFailoverConsumerOutstandingSendTx(true);
}
public void doTestFailoverConsumerOutstandingSendTx(final boolean doActualBrokerCommit) throws Exception {
final boolean watchTopicAdvisories = true; final boolean watchTopicAdvisories = true;
broker = createBroker(true); broker = createBroker(true);
@ -175,6 +188,10 @@ public class FailoverConsumerOutstandingCommitTest {
@Override @Override
public void commitTransaction(ConnectionContext context, public void commitTransaction(ConnectionContext context,
TransactionId xid, boolean onePhase) throws Exception { TransactionId xid, boolean onePhase) throws Exception {
if (doActualBrokerCommit) {
LOG.info("doing actual broker commit...");
super.commitTransaction(context, xid, onePhase);
}
// so commit will hang as if reply is lost // so commit will hang as if reply is lost
context.setDontSendReponse(true); context.setDontSendReponse(true);
Executors.newSingleThreadExecutor().execute(new Runnable() { Executors.newSingleThreadExecutor().execute(new Runnable() {
@ -200,25 +217,27 @@ public class FailoverConsumerOutstandingCommitTest {
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME final Queue destination = producerSession.createQueue(QUEUE_NAME
+ "?jms.consumer.prefetch=" + prefetch); + "?consumer.prefetchSize=" + prefetch);
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
final CountDownLatch commitDoneLatch = new CountDownLatch(1); final CountDownLatch commitDoneLatch = new CountDownLatch(1);
final CountDownLatch messagesReceived = new CountDownLatch(2); final CountDownLatch messagesReceived = new CountDownLatch(3);
final AtomicBoolean gotCommitException = new AtomicBoolean(false);
final ArrayList<TextMessage> receivedMessages = new ArrayList<TextMessage>();
final MessageConsumer testConsumer = consumerSession.createConsumer(destination); final MessageConsumer testConsumer = consumerSession.createConsumer(destination);
testConsumer.setMessageListener(new MessageListener() { testConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) { public void onMessage(Message message) {
LOG.info("consume one and commit"); LOG.info("consume one and commit: " + message);
assertNotNull("got message", message); assertNotNull("got message", message);
receivedMessages.add((TextMessage) message);
try { try {
produceMessage(consumerSession, destination, 1); produceMessage(consumerSession, destination, 1);
consumerSession.commit(); consumerSession.commit();
} catch (JMSException e) { } catch (JMSException e) {
e.printStackTrace(); LOG.info("commit exception", e);
gotCommitException.set(true);
} }
commitDoneLatch.countDown(); commitDoneLatch.countDown();
messagesReceived.countDown(); messagesReceived.countDown();
@ -233,9 +252,14 @@ public class FailoverConsumerOutstandingCommitTest {
broker = createBroker(false); broker = createBroker(false);
broker.start(); broker.start();
assertTrue("consumer added through failover", commitDoneLatch.await(20, TimeUnit.SECONDS)); assertTrue("commit done through failover", commitDoneLatch.await(20, TimeUnit.SECONDS));
assertTrue("commit failed", gotCommitException.get());
assertTrue("another message was received after failover", messagesReceived.await(20, TimeUnit.SECONDS)); assertTrue("another message was received after failover", messagesReceived.await(20, TimeUnit.SECONDS));
assertEquals("get message 0 first", MESSAGE_TEXT + "0", receivedMessages.get(0).getText());
// it was redelivered
assertEquals("get message 0 second", MESSAGE_TEXT + "0", receivedMessages.get(1).getText());
assertTrue("another message was received", messagesReceived.await(20, TimeUnit.SECONDS));
assertEquals("get message 1 eventually", MESSAGE_TEXT + "1", receivedMessages.get(2).getText());
connection.close(); connection.close();
} }
@ -245,7 +269,8 @@ public class FailoverConsumerOutstandingCommitTest {
broker = createBroker(true); broker = createBroker(true);
broker.start(); broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
cf.setConsumerFailoverRedeliveryWaitPeriod(10000);
final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
connection.start(); connection.start();
@ -300,7 +325,7 @@ public class FailoverConsumerOutstandingCommitTest {
throws JMSException { throws JMSException {
MessageProducer producer = producerSession.createProducer(destination); MessageProducer producer = producerSession.createProducer(destination);
for (int i=0; i<count; i++) { for (int i=0; i<count; i++) {
TextMessage message = producerSession.createTextMessage("Test message " + i); TextMessage message = producerSession.createTextMessage(MESSAGE_TEXT + i);
producer.send(message); producer.send(message);
} }
producer.close(); producer.close();

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.transport.failover; package org.apache.activemq.transport.failover;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -40,6 +39,7 @@ import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException; import javax.jms.TransactionRolledBackException;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -55,6 +55,7 @@ import org.junit.After;
import org.junit.Test; import org.junit.Test;
// see https://issues.apache.org/activemq/browse/AMQ-2473 // see https://issues.apache.org/activemq/browse/AMQ-2473
// https://issues.apache.org/activemq/browse/AMQ-2590
public class FailoverTransactionTest { public class FailoverTransactionTest {
private static final Log LOG = LogFactory.getLog(FailoverTransactionTest.class); private static final Log LOG = LogFactory.getLog(FailoverTransactionTest.class);
@ -167,11 +168,12 @@ public class FailoverTransactionTest {
LOG.info("doing async commit..."); LOG.info("doing async commit...");
try { try {
session.commit(); session.commit();
commitDoneLatch.countDown(); } catch (JMSException e) {
LOG.info("done async commit"); assertTrue(e instanceof TransactionRolledBackException);
} catch (Exception e) { LOG.info("got commit exception: ", e);
e.printStackTrace();
} }
commitDoneLatch.countDown();
LOG.info("done async commit");
} }
}); });
@ -285,110 +287,8 @@ public class FailoverTransactionTest {
} }
session.commit(); session.commit();
connection.close(); connection.close();
}
@Test
public void testFailoverConsumerCommitLost() throws Exception {
final int adapter = 0;
broker = createBroker(true);
setPersistenceAdapter(adapter);
broker.setPlugins(new BrokerPlugin[] {
new BrokerPluginSupport() {
@Override
public void commitTransaction(ConnectionContext context,
TransactionId xid, boolean onePhase) throws Exception {
super.commitTransaction(context, xid, onePhase);
// so commit will hang as if reply is lost
context.setDontSendReponse(true);
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("Stopping broker post commit...");
try {
broker.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
});
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
Connection connection = cf.createConnection();
connection.start();
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Session consumerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue destination = producerSession.createQueue(QUEUE_NAME);
final MessageConsumer consumer = consumerSession.createConsumer(destination);
produceMessage(producerSession, destination);
final Vector<Message> receivedMessages = new Vector<Message>();
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("doing async commit after consume...");
try {
Message msg = consumer.receive(20000);
LOG.info("Got message: " + msg);
receivedMessages.add(msg);
consumerSession.commit();
commitDoneLatch.countDown();
LOG.info("done async commit");
} catch (Exception e) {
e.printStackTrace();
}
}
});
// will be stopped by the plugin
broker.waitUntilStopped();
broker = createBroker(false);
setPersistenceAdapter(adapter);
broker.start();
assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
assertEquals("we got a message", 1, receivedMessages.size());
// new transaction
Message msg = consumer.receive(20000);
LOG.info("Received: " + msg);
assertNull("we did not get a duplicate message", msg);
consumerSession.commit();
consumer.close();
connection.close();
// ensure no dangling messages with fresh broker etc
broker.stop();
broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages..");
broker = createBroker(false);
setPersistenceAdapter(adapter);
broker.start();
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
connection = cf.createConnection();
connection.start();
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer2 = session2.createConsumer(destination);
msg = consumer2.receive(1000);
if (msg == null) {
msg = consumer2.receive(5000);
}
LOG.info("Received: " + msg);
assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
} }
@Test @Test
public void testFailoverConsumerAckLost() throws Exception { public void testFailoverConsumerAckLost() throws Exception {
// as failure depends on hash order, do a few times // as failure depends on hash order, do a few times
@ -563,6 +463,93 @@ public class FailoverTransactionTest {
connection.close(); connection.close();
} }
@Test
public void testAutoRollbackWithMissingRedeliveries() throws Exception {
broker = createBroker(true);
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
Connection connection = cf.createConnection();
connection.start();
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = consumerSession.createConsumer(destination);
produceMessage(producerSession, destination);
Message msg = consumer.receive(20000);
assertNotNull(msg);
broker.stop();
broker = createBroker(false);
// use empty jdbc store so that default wait for redeliveries will timeout after failover
setPersistenceAdapter(1);
broker.start();
try {
consumerSession.commit();
} catch (JMSException expectedRolledback) {
assertTrue(expectedRolledback instanceof TransactionRolledBackException);
}
broker.stop();
broker = createBroker(false);
broker.start();
assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000));
connection.close();
}
@Test
public void testWaitForMissingRedeliveries() throws Exception {
LOG.info("testWaitForMissingRedeliveries()");
broker = createBroker(true);
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=30000");
Connection connection = cf.createConnection();
connection.start();
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME);
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = consumerSession.createConsumer(destination);
produceMessage(producerSession, destination);
Message msg = consumer.receive(20000);
if (msg == null) {
AutoFailTestSupport.dumpAllThreads("missing-");
}
assertNotNull("got message just produced", msg);
broker.stop();
broker = createBroker(false);
// use empty jdbc store so that wait for re-deliveries occur when failover resumes
setPersistenceAdapter(1);
broker.start();
final CountDownLatch commitDone = new CountDownLatch(1);
// will block pending re-deliveries
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("doing async commit...");
try {
consumerSession.commit();
commitDone.countDown();
} catch (JMSException ignored) {
}
}
});
broker.stop();
broker = createBroker(false);
broker.start();
assertTrue("commit was successfull", commitDone.await(30, TimeUnit.SECONDS));
assertNull("should not get committed message", consumer.receive(5000));
connection.close();
}
private void produceMessage(final Session producerSession, Queue destination) private void produceMessage(final Session producerSession, Queue destination)
throws JMSException { throws JMSException {
MessageProducer producer = producerSession.createProducer(destination); MessageProducer producer = producerSession.createProducer(destination);