From 6778a49eecb2efe2af4fb6bb2a0d7c82f9861248 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Thu, 27 Oct 2011 10:21:20 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3568 - Consumer auto acking of duplicate message dispatch can lead to Unmatched acknowledge: and redelivery. Using individual ack in this case. Added a warn, as this sort of duplicate shoud be trapped by the store producerAudit, maxFailoverProducersToTrack may need an increase git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1189700 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/ActiveMQMessageConsumer.java | 15 +- .../activemq/broker/TransportConnection.java | 2 +- .../activemq/command/ConnectionInfo.java | 2 +- .../failover/FailoverDuplicateTest.java | 252 ++++++++++++++++++ 4 files changed, 262 insertions(+), 9 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index cd83b24e0b..dc0d0fb3c2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -1294,10 +1294,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } else { if (!session.isTransacted()) { - if (LOG.isDebugEnabled()) { - LOG.debug(getConsumerId() + " ignoring (auto acking) duplicate: " + md.getMessage()); - } - MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); + LOG.warn("Duplicate dispatch on connection: " + session.getConnection().getConnectionInfo().getConnectionId() + + " to consumer: " + getConsumerId() + ", ignoring (auto acking) duplicate: " + md); + MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1); session.sendAck(ack); } else { if (LOG.isDebugEnabled()) { @@ -1314,11 +1313,13 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } if (needsPoisonAck) { - LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another" - + " consumer on this connection, failoverRedeliveryWaitPeriod=" - + failoverRedeliveryWaitPeriod + ". Message: " + md); MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); poisonAck.setFirstMessageId(md.getMessage().getMessageId()); + poisonAck.setPoisonCause(new JMSException("Duplicate dispatch with transacted redeliver pending on another consumer, connection: " + + session.getConnection().getConnectionInfo().getConnectionId())); + LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another" + + " consumer on this connection, failoverRedeliveryWaitPeriod=" + + failoverRedeliveryWaitPeriod + ". Message: " + md + ", poisonAck: " + poisonAck); session.sendAck(poisonAck); } else { if (transactedIndividualAck) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 04e6d43655..3e468f3780 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -678,7 +678,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } } registerConnectionState(info.getConnectionId(), state); - LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress()); + LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress() + ", info: " + info); this.faultTolerantConnection=info.isFaultTolerant(); // Setup the context. String clientId = info.getClientId(); diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java index 7c4a10078f..c87f2875ad 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java @@ -37,8 +37,8 @@ public class ConnectionInfo extends BaseCommand { protected boolean manageable; protected boolean clientMaster = true; protected boolean faultTolerant = false; + protected boolean failoverReconnect; protected transient Object transportContext; - private boolean failoverReconnect; public ConnectionInfo() { } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java new file mode 100644 index 0000000000..797ca784a5 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.failover; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerPluginSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FailoverDuplicateTest extends TestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(FailoverDuplicateTest.class); + private static final String QUEUE_NAME = "TestQueue"; + private static final String TRANSPORT_URI = "tcp://localhost:0"; + private String url; + BrokerService broker; + + + public void tearDown() throws Exception { + stopBroker(); + } + + public void stopBroker() throws Exception { + if (broker != null) { + broker.stop(); + } + } + + private void startCleanBroker() throws Exception { + startBroker(true); + } + + public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception { + broker = createBroker(deleteAllMessagesOnStartup); + broker.start(); + } + + public void startBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception { + broker = createBroker(deleteAllMessagesOnStartup, bindAddress); + broker.start(); + } + + public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { + return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI); + } + + public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception { + broker = new BrokerService(); + broker.setUseJmx(false); + broker.setAdvisorySupport(false); + broker.addConnector(bindAddress); + broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); + + url = broker.getTransportConnectors().get(0).getConnectUri().toString(); + + return broker; + } + + public void configureConnectionFactory(ActiveMQConnectionFactory factory) { + factory.setAuditMaximumProducerNumber(2048); + factory.setOptimizeAcknowledge(true); + } + + @SuppressWarnings("unchecked") + public void testFailoverSendReplyLost() throws Exception { + + broker = createBroker(true); + setDefaultPersistenceAdapter(broker); + + final CountDownLatch gotMessageLatch = new CountDownLatch(1); + final CountDownLatch producersDone = new CountDownLatch(1); + final AtomicBoolean first = new AtomicBoolean(false); + broker.setPlugins(new BrokerPlugin[]{ + new BrokerPluginSupport() { + @Override + public void send(final ProducerBrokerExchange producerExchange, + org.apache.activemq.command.Message messageSend) + throws Exception { + // so send will hang as if reply is lost + super.send(producerExchange, messageSend); + if (first.compareAndSet(false, true)) { + producerExchange.getConnectionContext().setDontSendReponse(true); + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + try { + LOG.info("Waiting for recepit"); + assertTrue("message received on time", gotMessageLatch.await(60, TimeUnit.SECONDS)); + assertTrue("new producers done on time", producersDone.await(120, TimeUnit.SECONDS)); + LOG.info("Stopping connection post send and receive and multiple producers"); + producerExchange.getConnectionContext().getConnection().stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + } + } + }); + broker.start(); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.watchTopicAdvisories=false"); + configureConnectionFactory(cf); + Connection sendConnection = cf.createConnection(); + sendConnection.start(); + + final Session sendSession = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue destination = sendSession.createQueue(QUEUE_NAME); + + + final AtomicInteger receivedCount = new AtomicInteger(); + MessageListener listener = new MessageListener() { + @Override + public void onMessage(Message message) { + gotMessageLatch.countDown(); + receivedCount.incrementAndGet(); + } + }; + Connection receiveConnection; + Session receiveSession = null; + receiveConnection = cf.createConnection(); + receiveConnection.start(); + receiveSession = receiveConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + receiveSession.createConsumer(destination).setMessageListener(listener); + + final CountDownLatch sendDoneLatch = new CountDownLatch(1); + // broker will die on send reply so this will hang till restart + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + LOG.info("doing async send..."); + try { + produceMessage(sendSession, destination, "will resend", 1); + } catch (JMSException e) { + LOG.error("got send exception: ", e); + fail("got unexpected send exception" + e); + } + sendDoneLatch.countDown(); + LOG.info("done async send"); + } + }); + + + assertTrue("one message got through on time", gotMessageLatch.await(20, TimeUnit.SECONDS)); + // send more messages, blow producer audit + final int numProducers = 1050; + final int numPerProducer = 2; + final int totalSent = numPerProducer * numProducers + 1; + for (int i=0; i