From 8e59e249051b96f6f5204b3c261621550205023d Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Wed, 7 Oct 2009 17:41:21 +0000 Subject: [PATCH] resolve duplicate message issue from: https://issues.apache.org/activemq/browse/AMQ-2439 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@822811 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/AbstractRegion.java | 18 ++-- .../DemandForwardingBridgeSupport.java | 28 +++++-- .../activemq/network/DemandSubscription.java | 61 +++++++++----- .../org/apache/activemq/bugs/AMQ2439Test.java | 82 +++++++++++++++++++ 4 files changed, 147 insertions(+), 42 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index 7daafc969a..c0548fb3ce 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -357,20 +357,12 @@ public abstract class AbstractRegion implements Region { public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { Subscription sub = consumerExchange.getSubscription(); if (sub == null) { - sub = subscriptions.get(ack.getConsumerId()); - + sub = subscriptions.get(ack.getConsumerId()); if (sub == null) { - //networked subscriptions are going to acknowledge in flight messages - //on behalf a subscription that is no more ... - if (!consumerExchange.getConnectionContext().isNetworkConnection() - && !consumerExchange.getConnectionContext() - .isInRecoveryMode()) { - throw new IllegalArgumentException( - "The subscription does not exist: " - + ack.getConsumerId()); - } else { - return; - } + LOG.warn("Ack for non existent subscription, ack:" + ack); + throw new IllegalArgumentException( + "The subscription does not exist: " + + ack.getConsumerId()); } consumerExchange.setSubscription(sub); } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 6f653cbfbb..ea8ec03f68 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -623,13 +623,24 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } - protected void removeSubscription(DemandSubscription sub) throws IOException { + protected void removeSubscription(final DemandSubscription sub) throws IOException { if (sub != null) { if (LOG.isDebugEnabled()) { LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId()); } - localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); + + // continue removal in separate thread to free up this thread for outstanding responses + ASYNC_TASKS.execute(new Runnable() { + public void run() { + sub.waitForCompletion(); + try { + localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); + } catch (IOException e) { + LOG.warn("failed to deliver remove command for local subscription, for remote " + sub.getRemoteInfo().getConsumerId(), e); + } + } + }); } } @@ -652,9 +663,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (command.isMessageDispatch()) { enqueueCounter.incrementAndGet(); final MessageDispatch md = (MessageDispatch)command; - DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); + final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); if (sub != null && md.getMessage()!=null) { - // See if this consumer's brokerPath tells us it came from the broker at the other end // of the bridge. I think we should be making this decision based on the message's // broker bread crumbs and not the consumer's? However, the message's broker bread @@ -685,8 +695,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); - dequeueCounter.incrementAndGet(); - + dequeueCounter.incrementAndGet(); + } else { // The message was not sent using async send, so we @@ -703,16 +713,20 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } else { localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); dequeueCounter.incrementAndGet(); + } } catch (IOException e) { serviceLocalException(e); + } finally { + sub.decrementOutstandingResponses(); } } }; remoteBroker.asyncRequest(message, callback); + sub.incrementOutstandingResponses(); } - + } else { if (LOG.isDebugEnabled()) { LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage()); diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java index f60c86f2d0..0f53a885c4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java @@ -18,10 +18,13 @@ package org.apache.activemq.network; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * Represents a network bridge interface @@ -29,10 +32,13 @@ import org.apache.activemq.command.ConsumerInfo; * @version $Revision: 1.1 $ */ public class DemandSubscription { + private static final Log LOG = LogFactory.getLog(DemandSubscription.class); + private final ConsumerInfo remoteInfo; private final ConsumerInfo localInfo; private Set remoteSubsIds = new CopyOnWriteArraySet(); private AtomicInteger dispatched = new AtomicInteger(0); + private AtomicBoolean activeWaiter = new AtomicBoolean(); DemandSubscription(ConsumerInfo info) { remoteInfo = info; @@ -68,27 +74,6 @@ public class DemandSubscription { return remoteSubsIds.isEmpty(); } - /** - * @return Returns the dispatched. - */ - public int getDispatched() { - return dispatched.get(); - } - - /** - * @param dispatched The dispatched to set. - */ - public void setDispatched(int dispatched) { - this.dispatched.set(dispatched); - } - - /** - * @return dispatched count after incremented - */ - public int incrementDispatched() { - return dispatched.incrementAndGet(); - } - /** * @return Returns the localInfo. */ @@ -102,5 +87,37 @@ public class DemandSubscription { */ public ConsumerInfo getRemoteInfo() { return remoteInfo; - } + } + + public void waitForCompletion() { + if (dispatched.get() > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting for completion for sub: " + localInfo.getConsumerId() + ", dispatched: " + this.dispatched.get()); + } + activeWaiter.set(true); + if (dispatched.get() > 0) { + synchronized (activeWaiter) { + try { + activeWaiter.wait(); + } catch (InterruptedException ignored) { + } + } + if (this.dispatched.get() > 0) { + LOG.warn("demand sub interrupted or timedout while waiting for outstanding responses, expect potentially " + this.dispatched.get() + " duplicate deliveried"); + } + } + } + } + + public void decrementOutstandingResponses() { + if (dispatched.decrementAndGet() == 0 && activeWaiter.get()) { + synchronized(activeWaiter) { + activeWaiter.notifyAll(); + } + } + } + + public void incrementOutstandingResponses() { + dispatched.incrementAndGet(); + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java new file mode 100644 index 0000000000..5f474f2b79 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import java.net.URI; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.jmx.BrokerView; + +public class AMQ2439Test extends JmsMultipleBrokersTestSupport { + Destination dest; + + + public void testDuplicatesThroughNetwork() throws Exception { + assertEquals("received expected amount", 500, receiveExactMessages("BrokerB", 500)); + assertEquals("received expected amount", 500, receiveExactMessages("BrokerB", 500)); + validateQueueStats(); + } + + private void validateQueueStats() throws Exception { + BrokerView brokerView = brokers.get("BrokerA").broker.getAdminView(); + assertEquals("enequeue is correct", 1000, brokerView.getTotalEnqueueCount()); + assertEquals("dequeue is correct", 1000, brokerView.getTotalDequeueCount()); + } + + protected int receiveExactMessages(String brokerName, int msgCount) throws Exception { + + BrokerItem brokerItem = brokers.get(brokerName); + Connection connection = brokerItem.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(dest); + + Message msg; + int i; + for (i = 0; i < msgCount; i++) { + msg = consumer.receive(1000); + if (msg == null) { + break; + } + } + + connection.close(); + brokerItem.connections.remove(connection); + + return i; + } + + public void setUp() throws Exception { + super.setUp(); + createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=true&deleteAllMessagesOnStartup=true&advisorySupport=false")); + createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=true&deleteAllMessagesOnStartup=true&useJmx=false")); + bridgeBrokers("BrokerA", "BrokerB"); + + startAllBrokers(); + + // Create queue + dest = createDestination("TEST.FOO", false); + sendMessages("BrokerA", dest, 1000); + } +}