From 3f4d4a5f7a4301ae41d91a123bbef3e3e9683cb4 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Tue, 1 Jul 2008 17:59:37 +0000 Subject: [PATCH] Fix for https://issues.apache.org/activemq/browse/AMQ-1585 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@673157 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/activemq/broker/ft/MasterBroker.java | 9 +++++---- .../activemq/broker/region/PrefetchSubscription.java | 4 +++- .../java/org/apache/activemq/broker/region/Queue.java | 6 +++++- .../apache/activemq/broker/ft/QueueMasterSlaveTest.java | 1 + 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java index adaa256f22..5f10a5e3a6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java @@ -280,11 +280,13 @@ public class MasterBroker extends InsertableMutableBrokerFilter { } /** - * Notifiy the Broker that a dispatch has happened - * + * Notifiy the Broker that a dispatch will happen + * Do in 'pre' so that slave will avoid getting ack before dispatch + * similar logic to send() below. * @param messageDispatch */ - public void postProcessDispatch(MessageDispatch messageDispatch) { + public void preProcessDispatch(MessageDispatch messageDispatch) { + super.preProcessDispatch(messageDispatch); MessageDispatchNotification mdn = new MessageDispatchNotification(); mdn.setConsumerId(messageDispatch.getConsumerId()); mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId()); @@ -294,7 +296,6 @@ public class MasterBroker extends InsertableMutableBrokerFilter { mdn.setMessageId(msg.getMessageId()); sendAsyncToSlave(mdn); } - super.postProcessDispatch(messageDispatch); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 9c66a32737..acc64f7969 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -197,7 +197,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription { if (!context.isInTransaction()) { dequeueCounter++; node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); - node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); + if (!isSlave()) { + node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); + } removeList.add(node); } else { // setup a Synchronization to remove nodes from the diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 26a0d2a96d..15075cdc58 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1073,7 +1073,7 @@ public class Queue extends BaseDestination implements Task { } public void wakeup() { - if (optimizedDispatch) { + if (optimizedDispatch || isSlave()) { iterate(); }else { try { @@ -1085,6 +1085,10 @@ public class Queue extends BaseDestination implements Task { } + private boolean isSlave() { + return broker.getBrokerService().isSlave(); + } + private List doPageIn(boolean force) throws Exception { List result = null; dispatchLock.lock(); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java index 4d224450f0..cde96b8dd1 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java @@ -47,6 +47,7 @@ public class QueueMasterSlaveTest extends JmsTopicSendReceiveWithTwoConnectionsT File file = new File("."); System.setProperty("basedir", file.getAbsolutePath()); } + super.messageCount = 500; failureCount = super.messageCount / 2; super.topic = isTopic(); createMaster();