mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-5279 - ensure poison on failover redelivery only when delivery is not pending else where
This commit is contained in:
parent
4da588d4fc
commit
c34851fd57
|
@ -2215,7 +2215,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
@Override
|
||||
@Deprecated
|
||||
public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
|
||||
return createInputStream(dest, messageSelector, noLocal, -1);
|
||||
return createInputStream(dest, messageSelector, noLocal, -1);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -2571,6 +2571,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
return this.executor;
|
||||
}
|
||||
|
||||
protected CopyOnWriteArrayList<ActiveMQSession> getSessions() {
|
||||
return sessions;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the checkForDuplicates
|
||||
*/
|
||||
|
|
|
@ -1413,36 +1413,24 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
|||
}
|
||||
}
|
||||
} else {
|
||||
if (!session.isTransacted()) {
|
||||
LOG.warn("Duplicate non transacted dispatch to consumer: " + getConsumerId() + ", poison acking: " + md);
|
||||
posionAck(md, "Duplicate non transacted delivery to " + getConsumerId());
|
||||
} else {
|
||||
// deal with duplicate delivery
|
||||
ConsumerId consumerWithPendingTransaction;
|
||||
if (redeliveryExpectedInCurrentTransaction(md, true)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
|
||||
LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage());
|
||||
}
|
||||
boolean needsPoisonAck = false;
|
||||
synchronized (deliveredMessages) {
|
||||
if (previouslyDeliveredMessages != null) {
|
||||
previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
|
||||
} else {
|
||||
// delivery while pending redelivery to another consumer on the same connection
|
||||
// not waiting for redelivery will help here
|
||||
needsPoisonAck = true;
|
||||
}
|
||||
}
|
||||
if (needsPoisonAck) {
|
||||
LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another"
|
||||
+ " consumer on this connection, failoverRedeliveryWaitPeriod="
|
||||
+ failoverRedeliveryWaitPeriod + ". Message: " + md);
|
||||
posionAck(md, "Duplicate dispatch with transacted redeliver pending on another consumer, connection: "
|
||||
+ session.getConnection().getConnectionInfo().getConnectionId());
|
||||
if (transactedIndividualAck) {
|
||||
immediateIndividualTransactedAck(md);
|
||||
} else {
|
||||
if (transactedIndividualAck) {
|
||||
immediateIndividualTransactedAck(md);
|
||||
} else {
|
||||
session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1));
|
||||
}
|
||||
session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1));
|
||||
}
|
||||
} else if ((consumerWithPendingTransaction = redeliveryPendingInCompetingTransaction(md)) != null) {
|
||||
LOG.warn("{} delivering duplicate {}, pending transaction completion on {} will rollback", getConsumerId(), md.getMessage(), consumerWithPendingTransaction);
|
||||
session.getConnection().rollbackDuplicate(this, md.getMessage());
|
||||
dispatch(md);
|
||||
} else {
|
||||
LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md);
|
||||
posionAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1456,6 +1444,33 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
|||
}
|
||||
}
|
||||
|
||||
private boolean redeliveryExpectedInCurrentTransaction(MessageDispatch md, boolean markReceipt) {
|
||||
if (session.isTransacted()) {
|
||||
synchronized (deliveredMessages) {
|
||||
if (previouslyDeliveredMessages != null) {
|
||||
if (previouslyDeliveredMessages.containsKey(md.getMessage().getMessageId())) {
|
||||
if (markReceipt) {
|
||||
previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private ConsumerId redeliveryPendingInCompetingTransaction(MessageDispatch md) {
|
||||
for (ActiveMQSession activeMQSession: session.connection.getSessions()) {
|
||||
for (ActiveMQMessageConsumer activeMQMessageConsumer : activeMQSession.consumers) {
|
||||
if (activeMQMessageConsumer.redeliveryExpectedInCurrentTransaction(md, false)) {
|
||||
return activeMQMessageConsumer.getConsumerId();
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again
|
||||
private void clearDeliveredList() {
|
||||
if (clearDeliveredList) {
|
||||
|
|
|
@ -831,7 +831,7 @@ public class FailoverTransactionTest extends TestSupport {
|
|||
setDefaultPersistenceAdapter(broker);
|
||||
broker.start();
|
||||
|
||||
assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
|
||||
assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
|
||||
|
||||
LOG.info("received message count: " + receivedMessages.size());
|
||||
|
||||
|
@ -841,7 +841,7 @@ public class FailoverTransactionTest extends TestSupport {
|
|||
if (gotTransactionRolledBackException.get()) {
|
||||
assertNotNull("should be available again after commit rollback ex", msg);
|
||||
} else {
|
||||
assertNull("should be nothing left for consumer as recieve should have committed", msg);
|
||||
assertNull("should be nothing left for consumer as receive should have committed", msg);
|
||||
}
|
||||
consumerSession1.commit();
|
||||
|
||||
|
@ -1103,8 +1103,8 @@ public class FailoverTransactionTest extends TestSupport {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
public void testPoisonOnDeliveryWhilePending() throws Exception {
|
||||
LOG.info("testPoisonOnDeliveryWhilePending()");
|
||||
public void testReDeliveryWhilePending() throws Exception {
|
||||
LOG.info("testReDeliveryWhilePending()");
|
||||
broker = createBroker(true);
|
||||
broker.start();
|
||||
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000");
|
||||
|
@ -1134,8 +1134,7 @@ public class FailoverTransactionTest extends TestSupport {
|
|||
|
||||
final Vector<Exception> exceptions = new Vector<Exception>();
|
||||
|
||||
// commit may fail if other consumer gets the message on restart, it will be seen as a duplicate on the connection
|
||||
// but with no transaction and it pending on another consumer it will be poison
|
||||
// commit may fail if other consumer gets the message on restart
|
||||
Executors.newSingleThreadExecutor().execute(new Runnable() {
|
||||
public void run() {
|
||||
LOG.info("doing async commit...");
|
||||
|
@ -1149,24 +1148,24 @@ public class FailoverTransactionTest extends TestSupport {
|
|||
}
|
||||
});
|
||||
|
||||
assertNull("consumer2 not get a message while pending to 1 or consumed by 1", consumer2.receive(2000));
|
||||
|
||||
assertTrue("commit completed ", commitDone.await(15, TimeUnit.SECONDS));
|
||||
|
||||
// either message consumed or sent to dlq via poison on redelivery to wrong consumer
|
||||
// message should not be available again in any event
|
||||
// either message redelivered in existing tx or consumed by consumer2
|
||||
// should not be available again in any event
|
||||
assertNull("consumer should not get rolled back on non redelivered message or duplicate", consumer.receive(5000));
|
||||
|
||||
// consumer replay is hashmap order dependent on a failover connection state recover so need to deal with both cases
|
||||
if (exceptions.isEmpty()) {
|
||||
// commit succeeded, message was redelivered to the correct consumer after restart so commit was fine
|
||||
LOG.info("commit succeeded, message was redelivered to the correct consumer after restart so commit was fine");
|
||||
assertNull("consumer2 not get a second message consumed by 1", consumer2.receive(2000));
|
||||
} else {
|
||||
// message should be in dlq
|
||||
MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
|
||||
TextMessage dlqMessage = (TextMessage) dlqConsumer.receive(5000);
|
||||
assertNotNull("found message in dlq", dlqMessage);
|
||||
assertEquals("text matches", "Test message", dlqMessage.getText());
|
||||
LOG.info("commit failed, consumer2 should get it", exceptions.get(0));
|
||||
assertNotNull("consumer2 got message", consumer2.receive(2000));
|
||||
consumerSession.commit();
|
||||
// no message should be in dlq
|
||||
MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
|
||||
assertNull("nothing in the dlq", dlqConsumer.receive(5000));
|
||||
}
|
||||
connection.close();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue