git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@673157 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-07-01 17:59:37 +00:00
parent bdbced30d8
commit 3f4d4a5f7a
4 changed files with 14 additions and 6 deletions

View File

@ -280,11 +280,13 @@ public class MasterBroker extends InsertableMutableBrokerFilter {
}
/**
* Notifiy the Broker that a dispatch has happened
*
* Notifiy the Broker that a dispatch will happen
* Do in 'pre' so that slave will avoid getting ack before dispatch
* similar logic to send() below.
* @param messageDispatch
*/
public void postProcessDispatch(MessageDispatch messageDispatch) {
public void preProcessDispatch(MessageDispatch messageDispatch) {
super.preProcessDispatch(messageDispatch);
MessageDispatchNotification mdn = new MessageDispatchNotification();
mdn.setConsumerId(messageDispatch.getConsumerId());
mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
@ -294,7 +296,6 @@ public class MasterBroker extends InsertableMutableBrokerFilter {
mdn.setMessageId(msg.getMessageId());
sendAsyncToSlave(mdn);
}
super.postProcessDispatch(messageDispatch);
}
/**

View File

@ -197,7 +197,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (!context.isInTransaction()) {
dequeueCounter++;
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
if (!isSlave()) {
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
}
removeList.add(node);
} else {
// setup a Synchronization to remove nodes from the

View File

@ -1073,7 +1073,7 @@ public class Queue extends BaseDestination implements Task {
}
public void wakeup() {
if (optimizedDispatch) {
if (optimizedDispatch || isSlave()) {
iterate();
}else {
try {
@ -1085,6 +1085,10 @@ public class Queue extends BaseDestination implements Task {
}
private boolean isSlave() {
return broker.getBrokerService().isSlave();
}
private List<QueueMessageReference> doPageIn(boolean force) throws Exception {
List<QueueMessageReference> result = null;
dispatchLock.lock();

View File

@ -47,6 +47,7 @@ public class QueueMasterSlaveTest extends JmsTopicSendReceiveWithTwoConnectionsT
File file = new File(".");
System.setProperty("basedir", file.getAbsolutePath());
}
super.messageCount = 500;
failureCount = super.messageCount / 2;
super.topic = isTopic();
createMaster();