AMQ-7298 - rework redelivery message tracking to ensure no duplicate suppression (and dlq) in error for local transaction batches that failover

This commit is contained in:
gtully 2019-09-04 12:24:42 +01:00
parent 0ae247463f
commit 02548777c2
5 changed files with 515 additions and 77 deletions

View File

@ -111,6 +111,19 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
this.transactionId = transactionId;
}
}
class PreviouslyDelivered {
org.apache.activemq.command.Message message;
boolean redelivered;
PreviouslyDelivered(MessageDispatch messageDispatch) {
message = messageDispatch.getMessage();
}
PreviouslyDelivered(MessageDispatch messageDispatch, boolean redelivered) {
message = messageDispatch.getMessage();
this.redelivered = redelivered;
}
}
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
protected final ActiveMQSession session;
@ -124,7 +137,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
// Always walk list in reverse order.
protected final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
// track duplicate deliveries in a transaction such that the tx integrity can be validated
private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages;
private PreviouslyDeliveredMap<MessageId, PreviouslyDelivered> previouslyDeliveredMessages;
private int deliveredCounter;
private int additionalWindowSize;
private long redeliveryDelay;
@ -144,7 +157,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
private ExecutorService executorService;
private MessageTransformer transformer;
private boolean clearDeliveredList;
private volatile boolean clearDeliveredList;
AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0);
private MessageAck pendingAck;
@ -766,14 +779,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
// allow dispatch on this connection to resume
session.connection.transportInterruptionProcessingComplete();
inProgressClearRequiredFlag.decrementAndGet();
inProgressClearRequiredFlag.set(0);
// Wake up any blockers and allow them to recheck state.
unconsumedMessages.getMutex().notifyAll();
}
}
clearDeliveredList();
}
clearDeliveredList();
}
void deliverAcks() {
@ -869,6 +882,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
}
if (previouslyDeliveredMessages != null) {
for (PreviouslyDelivered previouslyDelivered : previouslyDeliveredMessages.values()) {
session.connection.rollbackDuplicate(this, previouslyDelivered.message);
}
}
clearPreviouslyDelivered();
}
/**
@ -1141,8 +1160,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
numberNotReplayed = 0;
synchronized(deliveredMessages) {
if (previouslyDeliveredMessages != null) {
for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
if (!entry.getValue()) {
for (PreviouslyDelivered entry: previouslyDeliveredMessages.values()) {
if (!entry.redelivered) {
numberNotReplayed++;
}
}
@ -1169,11 +1188,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
// if any previously delivered messages was not re-delivered, transaction is invalid and must rollback
// as messages have been dispatched else where.
int numberNotReplayed = 0;
for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
if (!entry.getValue()) {
for (PreviouslyDelivered entry: previouslyDeliveredMessages.values()) {
if (!entry.redelivered) {
numberNotReplayed++;
LOG.debug("previously delivered message has not been replayed in transaction: {}, messageId: {}",
previouslyDeliveredMessages.transactionId, entry.getKey());
previouslyDeliveredMessages.transactionId, entry.message.getMessageId());
}
}
if (numberNotReplayed > 0) {
@ -1244,8 +1263,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
MessageDispatch md = iter.next();
md.getMessage().onMessageRolledBack();
// ensure we don't filter this as a duplicate
session.connection.rollbackDuplicate(this, md.getMessage());
}
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
@ -1275,17 +1292,18 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
session.sendAck(ack,true);
}
// stop the delivery of messages.
if (nonBlockingRedelivery) {
if (!unconsumedMessages.isClosed()) {
final LinkedList<MessageDispatch> pendingSessionRedelivery =
new LinkedList<MessageDispatch>(deliveredMessages);
final LinkedList<MessageDispatch> pendingRedeliveries =
new LinkedList<MessageDispatch>(deliveredMessages);
captureDeliveredMessagesForDuplicateSuppressionWithRequireRedelivery(false);
Collections.reverse(pendingRedeliveries);
deliveredCounter -= deliveredMessages.size();
deliveredMessages.clear();
deliveredCounter -= deliveredMessages.size();
deliveredMessages.clear();
if (!unconsumedMessages.isClosed()) {
if (nonBlockingRedelivery) {
Collections.reverse(pendingSessionRedelivery);
// Start up the delivery again a little later.
session.getScheduler().executeAfterDelay(new Runnable() {
@ -1293,7 +1311,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
public void run() {
try {
if (!unconsumedMessages.isClosed()) {
for(MessageDispatch dispatch : pendingRedeliveries) {
for(MessageDispatch dispatch : pendingSessionRedelivery) {
session.dispatch(dispatch);
}
}
@ -1302,42 +1320,52 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
}, redeliveryDelay);
}
} else {
unconsumedMessages.stop();
} else {
// stop the delivery of messages.
unconsumedMessages.stop();
for (MessageDispatch md : deliveredMessages) {
unconsumedMessages.enqueueFirst(md);
}
final ActiveMQMessageConsumer dispatcher = this;
deliveredCounter -= deliveredMessages.size();
deliveredMessages.clear();
if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
// Start up the delivery again a little later.
session.getScheduler().executeAfterDelay(new Runnable() {
Runnable redispatchWork = new Runnable() {
@Override
public void run() {
try {
if (started.get()) {
start();
if (!unconsumedMessages.isClosed()) {
synchronized (unconsumedMessages.getMutex()) {
for (MessageDispatch md : pendingSessionRedelivery) {
unconsumedMessages.enqueueFirst(md);
}
if (messageListener.get() != null) {
session.redispatch(dispatcher, unconsumedMessages);
}
}
if (started.get()) {
start();
}
}
} catch (JMSException e) {
session.connection.onAsyncException(e);
}
}
}, redeliveryDelay);
} else {
start();
};
if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
// Start up the delivery again a little later.
session.getScheduler().executeAfterDelay(redispatchWork, redeliveryDelay);
} else {
redispatchWork.run();
}
}
} else {
for (MessageDispatch md : pendingSessionRedelivery) {
session.connection.rollbackDuplicate(this, md.getMessage());
}
}
}
}
}
if (messageListener.get() != null) {
session.redispatch(this, unconsumedMessages);
}
}
/*
@ -1347,10 +1375,10 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
*/
private void rollbackPreviouslyDeliveredAndNotRedelivered() {
if (previouslyDeliveredMessages != null) {
for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
if (!entry.getValue()) {
LOG.trace("rollback non redelivered: {}" + entry.getKey());
removeFromDeliveredMessages(entry.getKey());
for (PreviouslyDelivered entry: previouslyDeliveredMessages.values()) {
if (!entry.redelivered) {
LOG.trace("rollback non redelivered: {}", entry.message.getMessageId());
removeFromDeliveredMessages(entry.message.getMessageId());
}
}
clearPreviouslyDelivered();
@ -1390,7 +1418,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
clearDeliveredList();
synchronized (unconsumedMessages.getMutex()) {
if (!unconsumedMessages.isClosed()) {
if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
// deliverySequenceId non zero means previously queued dispatch
if (this.info.isBrowser() || md.getDeliverySequenceId() != 0l || !session.connection.isDuplicate(this, md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning()) {
if (redeliveryExceeded(md)) {
posionAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
@ -1416,11 +1445,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
} else {
if (!unconsumedMessages.isRunning()) {
// delayed redelivery, ensure it can be re delivered
session.connection.rollbackDuplicate(this, md.getMessage());
}
md.setDeliverySequenceId(-1); // skip duplicate check on subsequent queued delivery
if (md.getMessage() == null) {
// End of browse or pull request timeout.
unconsumedMessages.enqueue(md);
@ -1476,9 +1501,10 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (session.isTransacted()) {
synchronized (deliveredMessages) {
if (previouslyDeliveredMessages != null) {
if (previouslyDeliveredMessages.containsKey(md.getMessage().getMessageId())) {
PreviouslyDelivered entry;
if ((entry = previouslyDeliveredMessages.get(md.getMessage().getMessageId())) != null) {
if (markReceipt) {
previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
entry.redelivered = true;
}
return true;
}
@ -1506,15 +1532,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (clearDeliveredList) {
if (!deliveredMessages.isEmpty()) {
if (session.isTransacted()) {
if (previouslyDeliveredMessages == null) {
previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId());
}
for (MessageDispatch delivered : deliveredMessages) {
previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
}
LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt",
getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size());
captureDeliveredMessagesForDuplicateSuppression();
} else {
if (session.isClientAcknowledge()) {
LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());
@ -1536,6 +1554,21 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
// called with deliveredMessages locked
private void captureDeliveredMessagesForDuplicateSuppression() {
captureDeliveredMessagesForDuplicateSuppressionWithRequireRedelivery (true);
}
private void captureDeliveredMessagesForDuplicateSuppressionWithRequireRedelivery(boolean requireRedelivery) {
if (previouslyDeliveredMessages == null) {
previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, PreviouslyDelivered>(session.getTransactionContext().getTransactionId());
}
for (MessageDispatch delivered : deliveredMessages) {
previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), new PreviouslyDelivered(delivered, !requireRedelivery));
}
LOG.trace("{} tracking existing transacted {} delivered list({})", getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size());
}
public int getMessageSize() {
return unconsumedMessages.size();
}

View File

@ -2097,9 +2097,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException {
List<MessageDispatch> c = unconsumedMessages.removeAll();
for (MessageDispatch md : c) {
this.connection.rollbackDuplicate(dispatcher, md.getMessage());
}
Collections.reverse(c);
for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {

View File

@ -33,6 +33,7 @@ import java.util.concurrent.TimeoutException;
import javax.jms.*;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@ -49,6 +50,8 @@ import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.activemq.broker.region.BaseDestination.DUPLICATE_FROM_STORE_MSG_PREFIX;
import static org.apache.activemq.command.ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY;
import static org.junit.Assert.*;
interface Configurer {
@ -91,6 +94,7 @@ public class AMQ2149Test {
public void createBroker(Configurer configurer) throws Exception {
broker = new BrokerService();
broker.setAdvisorySupport(false);
configurePersistenceAdapter(broker);
broker.getSystemUsage().getMemoryUsage().setLimit(MESSAGE_LENGTH_BYTES * 200 * NUM_SENDERS_AND_RECEIVERS);
@ -164,8 +168,9 @@ public class AMQ2149Test {
public Receiver(javax.jms.Destination dest, boolean transactional) throws JMSException {
this.dest = dest;
this.transactional = transactional;
connection = new ActiveMQConnectionFactory(brokerURL)
.createConnection();
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
connectionFactory.setWatchTopicAdvisories(false);
connection = connectionFactory.createConnection();
connection.setClientID(dest.toString());
session = connection.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
if (ActiveMQDestination.transform(dest).isTopic()) {
@ -224,7 +229,7 @@ public class AMQ2149Test {
lastId = message.getJMSMessageID();
} catch (TransactionRolledBackException expectedSometimesOnFailoverRecovery) {
++nextExpectedSeqNum;
LOG.info("got rollback: " + expectedSometimesOnFailoverRecovery);
LOG.info("got rollback: " + expectedSometimesOnFailoverRecovery, expectedSometimesOnFailoverRecovery);
if (expectedSometimesOnFailoverRecovery.getMessage().contains("completion in doubt")) {
// in doubt - either commit command or reply missing
// don't know if we will get a replay
@ -235,7 +240,17 @@ public class AMQ2149Test {
// batch will be replayed
nextExpectedSeqNum -= TRANSACITON_BATCH;
}
} catch (JMSException expectedSometimesOnFailoverRecoveryWithNestedTransactionRolledBackException) {
++nextExpectedSeqNum;
LOG.info("got rollback: " + expectedSometimesOnFailoverRecoveryWithNestedTransactionRolledBackException, expectedSometimesOnFailoverRecoveryWithNestedTransactionRolledBackException);
if (expectedSometimesOnFailoverRecoveryWithNestedTransactionRolledBackException.getMessage().contains("xaErrorCode:100")) {
resumeOnNextOrPreviousIsOk = false;
// batch will be replayed
nextExpectedSeqNum -= TRANSACITON_BATCH;
} else {
LOG.error(dest + " onMessage error:" + expectedSometimesOnFailoverRecoveryWithNestedTransactionRolledBackException);
exceptions.add(expectedSometimesOnFailoverRecoveryWithNestedTransactionRolledBackException);
}
} catch (Throwable e) {
LOG.error(dest + " onMessage error:" + e);
exceptions.add(e);
@ -258,8 +273,9 @@ public class AMQ2149Test {
public Sender(javax.jms.Destination dest) throws JMSException {
this.dest = dest;
connection = new ActiveMQConnectionFactory(brokerURL)
.createConnection();
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
activeMQConnectionFactory.setWatchTopicAdvisories(false);
connection = activeMQConnectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
messageProducer = session.createProducer(dest);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
@ -360,7 +376,7 @@ public class AMQ2149Test {
});
verifyOrderedMessageReceipt();
verifyStats(false);
verifyStats(false, false);
}
@Test(timeout = 10 * 60 * 1000)
@ -383,7 +399,7 @@ public class AMQ2149Test {
timer.cancel();
}
verifyStats(true);
verifyStats(true, false);
}
@Test(timeout = 10 * 60 * 1000)
@ -403,7 +419,7 @@ public class AMQ2149Test {
timer.cancel();
}
verifyStats(true);
verifyStats(true, false);
}
@Test(timeout = 10 * 60 * 1000)
@ -436,10 +452,10 @@ public class AMQ2149Test {
timer.cancel();
}
verifyStats(true);
verifyStats(true, true);
}
private void verifyStats(boolean brokerRestarts) throws Exception {
private void verifyStats(boolean brokerRestarts, boolean transactional) throws Exception {
RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
for (Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) {
@ -455,6 +471,27 @@ public class AMQ2149Test {
stats.getEnqueues().getCount(), stats.getDequeues().getCount());
}
}
Destination activeMQDlq = regionBroker.getQueueRegion().getDestinationMap().get(new ActiveMQQueue("ActiveMQ.DLQ"));
if (activeMQDlq != null) {
// excuse duplicates from the store
int countToExcuse = 0;
org.apache.activemq.command.Message[] messages = activeMQDlq.browse();
for (org.apache.activemq.command.Message candidate: messages) {
final Object cause = candidate.getProperty(DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
if (cause!= null &&
((((String)cause).contains(DUPLICATE_FROM_STORE_MSG_PREFIX)) ||
!transactional && ((String)cause).contains("Suppressing duplicate delivery on connection"))) {
// expected some duplicate sends for durable subs
countToExcuse++;
} else {
LOG.error("Unexpected dlq: " + cause + ", " + candidate);
}
}
assertEquals("no unexpcted dlq messages", countToExcuse ,
activeMQDlq.getDestinationStatistics().getMessages().getCount());
}
}
private TimerTask schedualRestartTask(final Timer timer, final Configurer configurer) {

View File

@ -0,0 +1,368 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.failover;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.*;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.*;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.*;
import javax.jms.Message;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
public class FailoverDurableSubTransactionTest {
private static final Logger LOG = LoggerFactory.getLogger(FailoverDurableSubTransactionTest.class);
private static final String TOPIC_NAME = "Failover.WithTx";
private static final String TRANSPORT_URI = "tcp://localhost:0";
private String url;
BrokerService broker;
public enum FailType {
ON_DISPATCH,
ON_ACK,
ON_COMMIT,
ON_DISPACH_WITH_REPLAY_DELAY
}
@Parameterized.Parameter(0)
public FailType failType;
@Parameterized.Parameters(name ="failType=#{0}")
public static Iterable<Object[]> parameters() {
return Arrays.asList(new Object[][]{
{FailType.ON_DISPATCH},
{FailType.ON_DISPACH_WITH_REPLAY_DELAY},
{FailType.ON_ACK},
{FailType.ON_COMMIT}
});
}
@After
public void tearDown() throws Exception {
stopBroker();
}
public void stopBroker() throws Exception {
if (broker != null) {
broker.stop();
}
}
public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
broker = createBroker(deleteAllMessagesOnStartup);
broker.start();
}
public void startBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
broker = createBroker(deleteAllMessagesOnStartup, bindAddress);
broker.start();
}
public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
}
public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
broker = new BrokerService();
broker.setUseJmx(false);
broker.setAdvisorySupport(false);
broker.addConnector(bindAddress);
broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
policyMap.setDefaultEntry(defaultEntry);
broker.setDestinationPolicy(policyMap);
// faster redispatch
broker.setKeepDurableSubsActive(true);
url = broker.getTransportConnectors().get(0).getConnectUri().toString();
return broker;
}
public void configureConnectionFactory(ActiveMQConnectionFactory factory) {
factory.setWatchTopicAdvisories(false);
factory.getRedeliveryPolicy().setMaximumRedeliveries(-1);
if (!FailType.ON_DISPACH_WITH_REPLAY_DELAY.equals(failType)) {
factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0l);
factory.getRedeliveryPolicy().setRedeliveryDelay(0l);
}
}
@org.junit.Test
public void testFailoverCommit() throws Exception {
final AtomicInteger dispatchedCount = new AtomicInteger(0);
final int errorAt = FailType.ON_COMMIT.equals(failType) ? 1 : 9;
final int messageCount = 10;
broker = createBroker(true);
broker.setPlugins(new BrokerPlugin[]{
new BrokerPluginSupport() {
@Override
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
if (FailType.ON_COMMIT.equals(failType) && dispatchedCount.incrementAndGet() == errorAt) {
for (TransportConnection transportConnection : broker.getTransportConnectors().get(0).getConnections()) {
LOG.error("Whacking connection on commit: " + transportConnection);
transportConnection.serviceException(new IOException("ERROR NOW"));
}
} else {
super.commitTransaction(context, xid, onePhase);
}
}
@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
if (FailType.ON_ACK.equals(failType) && ack.getAckType() == MessageAck.DELIVERED_ACK_TYPE && dispatchedCount.incrementAndGet() == errorAt) {
for (TransportConnection transportConnection : broker.getTransportConnectors().get(0).getConnections()) {
LOG.error("Whacking connection on ack: " + transportConnection);
transportConnection.serviceException(new IOException("ERROR NOW"));
}
}
super.acknowledge(consumerExchange, ack);
}
@Override
public void postProcessDispatch(MessageDispatch messageDispatch) {
super.postProcessDispatch(messageDispatch);
if ((FailType.ON_DISPATCH.equals(failType) || FailType.ON_DISPACH_WITH_REPLAY_DELAY.equals(failType)) && dispatchedCount.incrementAndGet() == errorAt) {
for (TransportConnection transportConnection : broker.getTransportConnectors().get(0).getConnections()) {
LOG.error("Whacking connection on dispatch: " + transportConnection);
transportConnection.serviceException(new IOException("ERROR NOW"));
}
}
}
}
});
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
cf.setAlwaysSyncSend(true);
cf.setAlwaysSessionAsync(false);
cf.getPrefetchPolicy().setDurableTopicPrefetch(FailType.ON_ACK.equals(failType) ? 2 : 100);
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.setClientID("CID");
connection.start();
final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic destination = session.createTopic(TOPIC_NAME);
MessageConsumer consumer = session.createDurableSubscriber(destination, "DS");
consumer.close();
produceMessage(destination, messageCount);
LOG.info("Production done! " + broker.getDestination(ActiveMQDestination.transform(destination)));
consumer = session.createDurableSubscriber(destination, "DS");
AtomicBoolean success = new AtomicBoolean(false);
HashSet<Integer> dupCheck = new HashSet<Integer>();
while (!success.get()) {
dupCheck.clear();
int i = 0;
for (i = 0; i < messageCount; i++) {
Message msg = consumer.receive(5000);
if (msg == null) {
LOG.info("Failed to receive on: " + i);
break;
}
LOG.info("Received: @" + i + ":" + msg.getJMSMessageID() + ", ID:" + msg.getIntProperty("ID"));
assertTrue("single instance of: " + i, dupCheck.add( msg.getIntProperty("ID")));
}
try {
if (i == messageCount) {
session.commit();
success.set(true);
} else {
session.rollback();
}
} catch (TransactionRolledBackException expected) {
LOG.info("Got expected", expected);
session.rollback();
}
}
consumer.close();
connection.close();
org.apache.activemq.broker.region.Destination dlq = broker.getDestination(ActiveMQDestination.transform(new ActiveMQQueue(DEFAULT_DEAD_LETTER_QUEUE_NAME)));
LOG.info("DLQ: " + dlq);
assertEquals("DLQ empty ", 0, dlq.getDestinationStatistics().getMessages().getCount());
}
@org.junit.Test
public void testFailoverCommitListener() throws Exception {
final AtomicInteger dispatchedCount = new AtomicInteger(0);
final int errorAt = FailType.ON_ACK.equals(failType) ? 1 : 1;
final int messageCount = 10;
broker = createBroker(true);
broker.setPlugins(new BrokerPlugin[]{
new BrokerPluginSupport() {
@Override
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
LOG.info("commit request: " + xid);
if (FailType.ON_COMMIT.equals(failType) && dispatchedCount.incrementAndGet() == errorAt) {
for (TransportConnection transportConnection : broker.getTransportConnectors().get(0).getConnections()) {
LOG.error("Whacking connection on commit: " + transportConnection);
transportConnection.serviceException(new IOException("ERROR NOW"));
}
} else {
super.commitTransaction(context, xid, onePhase);
}
}
@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
LOG.info("ack request: " + ack);
if (FailType.ON_ACK.equals(failType) /*&& ack.getAckType() == MessageAck.DELIVERED_ACK_TYPE*/ && dispatchedCount.incrementAndGet() == errorAt) {
for (TransportConnection transportConnection : broker.getTransportConnectors().get(0).getConnections()) {
LOG.error("Whacking connection on ack: " + transportConnection);
transportConnection.serviceException(new IOException("ERROR NOW"));
}
} else {
super.acknowledge(consumerExchange, ack);
}
}
}
});
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
cf.setAlwaysSyncSend(true);
cf.setAlwaysSessionAsync(true);
//cf.getPrefetchPolicy().setDurableTopicPrefetch(FailType.ON_ACK.equals(failType) ? 2 : 100);
cf.setWatchTopicAdvisories(false);
Connection connection = cf.createConnection();
connection.setClientID("CID");
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic destination = session.createTopic(TOPIC_NAME);
MessageConsumer consumer = session.createDurableSubscriber(destination, "DS");
consumer.close();
connection.close();
produceMessage(destination, messageCount*2);
LOG.info("Production done! " + broker.getDestination(ActiveMQDestination.transform(destination)));
connection = cf.createConnection();
connection.setClientID("CID");
connection.start();
final Session receiveSession = connection.createSession(true, Session.SESSION_TRANSACTED);
consumer = receiveSession.createDurableSubscriber(destination, "DS");
AtomicBoolean success = new AtomicBoolean(false);
HashSet<Integer> dupCheck = new HashSet<Integer>();
final AtomicInteger receivedCount = new AtomicInteger();
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
try {
int i = receivedCount.getAndIncrement();
LOG.info("Received: @" + i + ":" + msg.getJMSMessageID() + ", ID:" + msg.getIntProperty("ID"));
assertTrue("single instance of: " + i, dupCheck.add( msg.getIntProperty("ID")));
if (receivedCount.get() == messageCount) {
receiveSession.commit();
success.set(true);
}
} catch (TransactionRolledBackException expected) {
LOG.info("Got expected", expected);
try {
receiveSession.rollback();
} catch (JMSException e) {
e.printStackTrace();
}
dupCheck.clear();
receivedCount.set(0);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
connection.start();
try {
assertTrue("committed ok", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return success.get();
}
}));
} finally {
consumer.close();
connection.close();
}
org.apache.activemq.broker.region.Destination dlq = broker.getDestination(ActiveMQDestination.transform(new ActiveMQQueue(DEFAULT_DEAD_LETTER_QUEUE_NAME)));
LOG.info("DLQ: " + dlq);
assertEquals("DLQ empty ", 0, dlq.getDestinationStatistics().getMessages().getCount());
}
private void produceMessage(Topic destination, int count)
throws JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(url);
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(destination);
TextMessage message = producerSession.createTextMessage("Test message");
for (int i=0; i<count; i++) {
message.setIntProperty("ID", i);
producer.send(message);
}
connection.close();
}
}

View File

@ -35,6 +35,7 @@ import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
@ -313,8 +314,10 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis
c.close();
// verify no pending sends completed in rolledback tx
assertEquals("nothing sent during close", enqueueCountWhenBlocked, broker.getDestination(ActiveMQDestination.transform(destination)).getDestinationStatistics().getEnqueues().getCount());
// temp dest should not exist
if (!ActiveMQDestination.transform(destination).isTemporary()) {
assertEquals("nothing sent during close", enqueueCountWhenBlocked, broker.getDestination(ActiveMQDestination.transform(destination)).getDestinationStatistics().getEnqueues().getCount());
}
} finally {
log4jLogger.removeAppender(appender);
}